This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new 6fb02357 Implement transaction retrieval with sequence number,
integrate LevelDB for summary storage, and update server configuration settings.
6fb02357 is described below
commit 6fb02357e6934bf411156fd90a98bb7eed19d733
Author: harish876 <[email protected]>
AuthorDate: Wed Dec 31 02:08:22 2025 +0000
Implement transaction retrieval with sequence number, integrate LevelDB for
summary storage, and update server configuration settings.
---
.../graphql/service/http_server/crow_service.cpp | 28 ++
.../graphql/service/kv_service/resdb_kv_client.cpp | 15 ++
.../graphql/service/kv_service/resdb_kv_client.h | 11 +-
executor/kv/kv_executor.cpp | 1 +
.../consensus/execution/transaction_executor.cpp | 2 +-
platform/consensus/ordering/pbft/commitment.cpp | 2 +-
.../ordering/pbft/viewchange_manager_test.cpp | 1 -
platform/statistic/BUILD | 5 +
platform/statistic/stats.cpp | 283 +++++++++++++++------
platform/statistic/stats.h | 9 +-
service/kv/kv_service.cpp | 2 +-
service/tools/config/server/server.config | 8 +-
12 files changed, 283 insertions(+), 84 deletions(-)
diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp
b/ecosystem/graphql/service/http_server/crow_service.cpp
index 03c738c8..c27f44f8 100644
--- a/ecosystem/graphql/service/http_server/crow_service.cpp
+++ b/ecosystem/graphql/service/http_server/crow_service.cpp
@@ -128,6 +128,34 @@ void CrowService::run() {
}
});
+ // Get value with sequence number of specific id
+ CROW_ROUTE(app, "/v2/transactions/<string>")
+ ([this](const crow::request &req, response &res, std::string id) {
+ auto result = kv_client_.GetWithSeq(id);
+ if (result != nullptr) {
+ LOG(INFO) << "client get value with seq = " << result->second
+ << ", seq = " << result->first;
+
+ // Send updated blocks list to websocket
+ if (users.size() > 0) {
+ for (auto u : users) u->send_text("Update blocks");
+ }
+
+ num_transactions_++;
+
+ crow::json::wvalue resp;
+ resp["value"] = result->second;
+ resp["seq"] = result->first;
+
+ res.set_header("Content-Type", "application/json");
+ res.end(resp.dump());
+ } else {
+ res.code = 500;
+ res.set_header("Content-Type", "text/plain");
+ res.end("get value with seq fail");
+ }
+ });
+
// Get values based on key range
CROW_ROUTE(app, "/v1/transactions/<string>/<string>")
([this](const crow::request &req, response &res, std::string min_id,
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
index 2273cf98..0baab27c 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
@@ -64,6 +64,21 @@ std::unique_ptr<std::string> ResDBKVClient::Get(const
std::string &key) {
return std::make_unique<std::string>(response.value());
}
+std::unique_ptr<std::pair<int64_t, std::string>> ResDBKVClient::GetWithSeq(
+ const std::string &key) {
+ KVRequest request;
+ request.set_cmd(KVRequest::GET);
+ request.set_key(key);
+ KVResponse response;
+ int ret = SendRequest(request, &response);
+ if (ret != 0) {
+ LOG(ERROR) << "send request fail, ret:" << ret;
+ return nullptr;
+ }
+ return std::make_unique<std::pair<int64_t, std::string>>(
+ response.seq(), response.value());
+}
+
std::unique_ptr<std::string> ResDBKVClient::GetAllValues() {
KVRequest request;
request.set_cmd(KVRequest::GETALLVALUES);
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
index ac35afc0..20249b62 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
@@ -19,21 +19,28 @@
#pragma once
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
#include "interface/rdbc/transaction_constructor.h"
namespace sdk {
// ResDBKVClient to send data to the kv server.
class ResDBKVClient : public resdb::TransactionConstructor {
-public:
+ public:
ResDBKVClient(const resdb::ResDBConfig &config);
int Set(const std::string &key, const std::string &data);
int64_t SetWithSeq(const std::string &key, const std::string &data);
+ std::unique_ptr<std::pair<int64_t, std::string>> GetWithSeq(
+ const std::string &key);
std::unique_ptr<std::string> Get(const std::string &key);
std::unique_ptr<std::string> GetAllValues();
std::unique_ptr<std::string> GetRange(const std::string &min_key,
const std::string &max_key);
};
-} // namespace sdk
+} // namespace sdk
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index f6fe8b86..78ed5710 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -61,6 +61,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest(
kv_response.set_seq(kv_request.seq());
} else if (kv_request.cmd() == KVRequest::GET) {
kv_response.set_value(Get(kv_request.key()));
+ kv_response.set_seq(kv_request.seq());
} else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
kv_response.set_value(GetAllValues());
} else if (kv_request.cmd() == KVRequest::GETRANGE) {
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 12f4b49a..f70be048 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -335,7 +335,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
response->set_seq(request->seq());
if (post_exec_func_) {
- post_exec_func_(std::move(request), std::move(response)); //TODO:
request_tracking
+ post_exec_func_(std::move(request), std::move(response));
}
global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 4a1aacc0..d75038aa 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -296,7 +296,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
message_manager_->AddConsensusMsg(context->signature,
std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
// LOG(ERROR)<<request->data().size();
- // global_stats_->GetTransactionDetails(request->data());
+ //global_stats_->GetTransactionDetails(request->data());
global_stats_->RecordStateTime("commit");
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
index 43b87541..1803dd0b 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
@@ -89,7 +89,6 @@ TEST_F(ViewChangeManagerTest, SendViewChange) {
request->set_hash(SignatureVerifier::CalculateHash("test_data"));
checkpoint_manager_->AddCommitData(std::move(request));
- // Wait a bit for the request to be processed (last_seq_ to be updated)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::promise<bool> propose_done;
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 3094840b..4c0f32bf 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,6 +26,10 @@ cc_library(
name = "stats",
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
+ copts = select({
+ "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
+ "//conditions:default": [],
+ }),
deps = [
":prometheus_handler",
"//common:asio",
@@ -38,6 +42,7 @@ cc_library(
"//proto/kv:kv_cc_proto",
"//third_party:crow",
"//third_party:prometheus",
+ "//third_party:leveldb",
],
)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 2d6099c4..0afb219a 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -22,8 +22,12 @@
#include <glog/logging.h>
#include <ctime>
+#include <sstream>
#include "common/utils/utils.h"
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/write_batch.h"
#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
@@ -124,22 +128,88 @@ void Stats::CrowRoute() {
crow::SimpleApp app;
while (!stop_) {
try {
+ //Deprecated
CROW_ROUTE(app, "/consensus_data")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 1";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- res.body = consensus_history_.dump();
- res.end();
- });
+ .methods("GET"_method)(
+ [this](const crow::request& req, crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin", "*");
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS");
+ res.set_header("Access-Control-Allow-Headers",
+ "Content-Type, Authorization");
+
+ int limit = 10;
+ int page = 1;
+
+ std::string url = req.url;
+ size_t query_pos = url.find('?');
+ if (query_pos != std::string::npos) {
+ std::string query_string = url.substr(query_pos + 1);
+ std::istringstream iss(query_string);
+ std::string param;
+ while (std::getline(iss, param, '&')) {
+ size_t eq_pos = param.find('=');
+ if (eq_pos != std::string::npos) {
+ std::string key = param.substr(0, eq_pos);
+ std::string value = param.substr(eq_pos + 1);
+ if (key == "limit") {
+ try {
+ limit = std::stoi(value);
+ if (limit <= 0) limit = 10;
+ } catch (...) {
+ limit = 10;
+ }
+ } else if (key == "page") {
+ try {
+ page = std::stoi(value);
+ if (page <= 0) page = 1;
+ } catch (...) {
+ page = 1;
+ }
+ }
+ }
+ }
+ }
+
+ nlohmann::json result;
+ int total_count = 0;
+ int offset = (page - 1) * limit;
+ int current_index = 0;
+ int items_collected = 0;
+
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ leveldb::Iterator* it =
+ summary_db_->NewIterator(leveldb::ReadOptions());
+
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ total_count++;
+ }
+
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ if (current_index >= offset && items_collected < limit) {
+ try {
+ result[it->key().ToString()] =
+ nlohmann::json::parse(it->value().ToString());
+ items_collected++;
+ } catch (...) {
+ res.code = 500;
+ result["error"] = "Failed to parse transaction data";
+ delete it;
+ res.body = result.dump();
+ res.end();
+ return;
+ }
+ }
+ current_index++;
+ }
+ delete it;
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
CROW_ROUTE(app, "/get_status")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
@@ -167,7 +237,7 @@ void Stats::CrowRoute() {
res.set_header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
-
+
res.body = "Not Enabled";
// Send your response
if (enable_faulty_switch_) {
@@ -211,6 +281,46 @@ void Stats::CrowRoute() {
mem_view_json.clear();
res.end();
});
+ CROW_ROUTE(app, "/consensus_data/<int>")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res, int txn_number) {
+ LOG(ERROR) << "API 5: Get transaction " << txn_number;
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ nlohmann::json result;
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ std::string txn_key = std::to_string(txn_number);
+ std::string value;
+ leveldb::Status status =
+ summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
+
+ if (status.ok() && !value.empty()) {
+ try {
+ result[txn_key] = nlohmann::json::parse(value);
+ } catch (...) {
+ res.code = 500;
+ result["error"] = "Failed to parse transaction data";
+ }
+ } else {
+ res.code = 404;
+ result["error"] = "Transaction not found";
+ result["txn_number"] = txn_number;
+ }
+ } else {
+ res.code = 503;
+ result["error"] = "Storage not initialized";
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
app.port(8500 + transaction_summary_.port).multithreaded().run();
sleep(1);
} catch (const std::exception& e) {
@@ -234,6 +344,19 @@ void Stats::SetProps(int replica_id, std::string ip, int
port,
enable_resview = resview_flag;
enable_faulty_switch_ = faulty_flag;
if (resview_flag) {
+ std::string storage_path = "/tmp/resdb_summaries_" + std::to_string(port);
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::DB* db = nullptr;
+ leveldb::Status status = leveldb::DB::Open(options, storage_path, &db);
+ if (status.ok()) {
+ summary_db_.reset(db);
+ LOG(INFO) << "Initialized LevelDB storage for summaries at: "
+ << storage_path;
+ } else {
+ LOG(ERROR) << "Failed to open LevelDB at " << storage_path << ": "
+ << status.ToString();
+ }
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
}
@@ -345,8 +468,18 @@ void Stats::SendSummary() {
summary_json_["ext_cache_hit_ratio"] =
transaction_summary_.ext_cache_hit_ratio_;
- consensus_history_[std::to_string(transaction_summary_.txn_number)] =
- summary_json_;
+
+ std::string txn_key = std::to_string(transaction_summary_.txn_number);
+
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
+ summary_json_.dump());
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
+ << ": " << status.ToString();
+ }
+ }
LOG(ERROR) << summary_json_.dump();
@@ -423,59 +556,59 @@ void Stats::MonitorGlobal() {
run_req_run_time = run_req_run_time_;
LOG(INFO) << "=========== monitor =========\n"
- << "server call:" << server_call - last_server_call
- << " server process:" << server_process - last_server_process
- << " socket recv:" << socket_recv - last_socket_recv
- << " "
- "client call:"
- << client_call - last_client_call
- << " "
- "client req:"
- << num_client_req - last_num_client_req
- << " "
- "broad_cast:"
- << broad_cast_msg - last_broad_cast_msg
- << " "
- "send broad_cast:"
- << send_broad_cast_msg - last_send_broad_cast_msg
- << " "
- "per send broad_cast:"
- << send_broad_cast_msg_per_rep -
last_send_broad_cast_msg_per_rep
- << " "
- "propose:"
- << num_propose - last_num_propose
- << " "
- "prepare:"
- << (num_prepare - last_num_prepare)
- << " "
- "commit:"
- << (num_commit - last_num_commit)
- << " "
- "pending execute:"
- << pending_execute - last_pending_execute
- << " "
- "execute:"
- << execute - last_execute
- << " "
- "execute done:"
- << execute_done - last_execute_done << " seq gap:" << seq_gap
- << " total request:" << total_request - last_total_request
- << " txn:" << (total_request - last_total_request) / 5
- << " total geo request:"
- << total_geo_request - last_total_geo_request
- << " total geo request per:"
- << (total_geo_request - last_total_geo_request) / 5
- << " geo request:" << (geo_request - last_geo_request)
- << " "
- "seq fail:"
- << seq_fail - last_seq_fail << " time:" << time
- << " "
- "\n--------------- monitor ------------";
+ << "server call:" << server_call - last_server_call
+ << " server process:" << server_process - last_server_process
+ << " socket recv:" << socket_recv - last_socket_recv
+ << " "
+ "client call:"
+ << client_call - last_client_call
+ << " "
+ "client req:"
+ << num_client_req - last_num_client_req
+ << " "
+ "broad_cast:"
+ << broad_cast_msg - last_broad_cast_msg
+ << " "
+ "send broad_cast:"
+ << send_broad_cast_msg - last_send_broad_cast_msg
+ << " "
+ "per send broad_cast:"
+ << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep
+ << " "
+ "propose:"
+ << num_propose - last_num_propose
+ << " "
+ "prepare:"
+ << (num_prepare - last_num_prepare)
+ << " "
+ "commit:"
+ << (num_commit - last_num_commit)
+ << " "
+ "pending execute:"
+ << pending_execute - last_pending_execute
+ << " "
+ "execute:"
+ << execute - last_execute
+ << " "
+ "execute done:"
+ << execute_done - last_execute_done << " seq gap:" << seq_gap
+ << " total request:" << total_request - last_total_request
+ << " txn:" << (total_request - last_total_request) / 5
+ << " total geo request:"
+ << total_geo_request - last_total_geo_request
+ << " total geo request per:"
+ << (total_geo_request - last_total_geo_request) / 5
+ << " geo request:" << (geo_request - last_geo_request)
+ << " "
+ "seq fail:"
+ << seq_fail - last_seq_fail << " time:" << time
+ << " "
+ "\n--------------- monitor ------------";
if (run_req_num - last_run_req_num > 0) {
LOG(INFO) << " req client latency:"
- << static_cast<double>(run_req_run_time -
- last_run_req_run_time) /
- (run_req_num - last_run_req_num) / 1000000000.0;
+ << static_cast<double>(run_req_run_time -
+ last_run_req_run_time) /
+ (run_req_num - last_run_req_num) / 1000000000.0;
}
last_seq_fail = seq_fail;
@@ -530,8 +663,10 @@ void Stats::IncPrepare() {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
- transaction_summary_.prepare_message_count_times_list.push_back(
- std::chrono::system_clock::now());
+ if (enable_resview) {
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
}
void Stats::IncCommit() {
@@ -539,8 +674,10 @@ void Stats::IncCommit() {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
- transaction_summary_.commit_message_count_times_list.push_back(
- std::chrono::system_clock::now());
+ if (enable_resview) {
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
}
void Stats::IncPendingExecute() { pending_execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 849c83d1..0d86bb3e 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -33,6 +33,11 @@
#include "proto/kv/kv.pb.h"
#include "sys/resource.h"
+// Forward declaration for LevelDB
+namespace leveldb {
+class DB;
+}
+
namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;
@@ -159,9 +164,11 @@ class Stats {
std::atomic<uint64_t> prev_num_prepare_;
std::atomic<uint64_t> prev_num_commit_;
nlohmann::json summary_json_;
- nlohmann::json consensus_history_;
std::unique_ptr<PrometheusHandler> prometheus_;
+
+ std::unique_ptr<leveldb::DB> summary_db_;
+ std::mutex summary_db_mutex_;
};
} // namespace resdb
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 269fb807..07b15462 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -51,7 +51,7 @@ int main(int argc, char** argv) {
exit(0);
}
google::InitGoogleLogging(argv[0]);
- FLAGS_minloglevel = 1;
+ FLAGS_minloglevel = 0;
char* config_file = argv[1];
char* private_key_file = argv[2];
diff --git a/service/tools/config/server/server.config
b/service/tools/config/server/server.config
index 61c66f04..68cbee87 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -48,9 +48,9 @@
},
require_txn_validation:true,
enable_resview:true,
- enable_faulty_switch:true,
- "enable_viewchange":true,
- "recovery_enabled":true,
- "max_client_complaint_num":10
+ enable_faulty_switch:false,
+ enable_viewchange:false,
+ recovery_enabled:true,
+ max_client_complaint_num:10
}