This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 6fb02357e6934bf411156fd90a98bb7eed19d733 Author: harish876 <[email protected]> AuthorDate: Wed Dec 31 02:08:22 2025 +0000 Implement transaction retrieval with sequence number, integrate LevelDB for summary storage, and update server configuration settings. --- .../graphql/service/http_server/crow_service.cpp | 28 ++ .../graphql/service/kv_service/resdb_kv_client.cpp | 15 ++ .../graphql/service/kv_service/resdb_kv_client.h | 11 +- executor/kv/kv_executor.cpp | 1 + .../consensus/execution/transaction_executor.cpp | 2 +- platform/consensus/ordering/pbft/commitment.cpp | 2 +- .../ordering/pbft/viewchange_manager_test.cpp | 1 - platform/statistic/BUILD | 5 + platform/statistic/stats.cpp | 283 +++++++++++++++------ platform/statistic/stats.h | 9 +- service/kv/kv_service.cpp | 2 +- service/tools/config/server/server.config | 8 +- 12 files changed, 283 insertions(+), 84 deletions(-) diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp b/ecosystem/graphql/service/http_server/crow_service.cpp index 03c738c8..c27f44f8 100644 --- a/ecosystem/graphql/service/http_server/crow_service.cpp +++ b/ecosystem/graphql/service/http_server/crow_service.cpp @@ -128,6 +128,34 @@ void CrowService::run() { } }); + // Get value with sequence number of specific id + CROW_ROUTE(app, "/v2/transactions/<string>") + ([this](const crow::request &req, response &res, std::string id) { + auto result = kv_client_.GetWithSeq(id); + if (result != nullptr) { + LOG(INFO) << "client get value with seq = " << result->second + << ", seq = " << result->first; + + // Send updated blocks list to websocket + if (users.size() > 0) { + for (auto u : users) u->send_text("Update blocks"); + } + + num_transactions_++; + + crow::json::wvalue resp; + resp["value"] = result->second; + resp["seq"] = result->first; + + res.set_header("Content-Type", "application/json"); + res.end(resp.dump()); + } else { + res.code = 500; + res.set_header("Content-Type", "text/plain"); + res.end("get value with seq fail"); + } + }); + // Get values based on key range CROW_ROUTE(app, "/v1/transactions/<string>/<string>") ([this](const crow::request &req, response &res, std::string min_id, diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp index 2273cf98..0baab27c 100644 --- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp +++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp @@ -64,6 +64,21 @@ std::unique_ptr<std::string> ResDBKVClient::Get(const std::string &key) { return std::make_unique<std::string>(response.value()); } +std::unique_ptr<std::pair<int64_t, std::string>> ResDBKVClient::GetWithSeq( + const std::string &key) { + KVRequest request; + request.set_cmd(KVRequest::GET); + request.set_key(key); + KVResponse response; + int ret = SendRequest(request, &response); + if (ret != 0) { + LOG(ERROR) << "send request fail, ret:" << ret; + return nullptr; + } + return std::make_unique<std::pair<int64_t, std::string>>( + response.seq(), response.value()); +} + std::unique_ptr<std::string> ResDBKVClient::GetAllValues() { KVRequest request; request.set_cmd(KVRequest::GETALLVALUES); diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h b/ecosystem/graphql/service/kv_service/resdb_kv_client.h index ac35afc0..20249b62 100644 --- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h +++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h @@ -19,21 +19,28 @@ #pragma once +#include <cstdint> +#include <memory> +#include <string> +#include <utility> + #include "interface/rdbc/transaction_constructor.h" namespace sdk { // ResDBKVClient to send data to the kv server. class ResDBKVClient : public resdb::TransactionConstructor { -public: + public: ResDBKVClient(const resdb::ResDBConfig &config); int Set(const std::string &key, const std::string &data); int64_t SetWithSeq(const std::string &key, const std::string &data); + std::unique_ptr<std::pair<int64_t, std::string>> GetWithSeq( + const std::string &key); std::unique_ptr<std::string> Get(const std::string &key); std::unique_ptr<std::string> GetAllValues(); std::unique_ptr<std::string> GetRange(const std::string &min_key, const std::string &max_key); }; -} // namespace sdk +} // namespace sdk diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp index f6fe8b86..78ed5710 100644 --- a/executor/kv/kv_executor.cpp +++ b/executor/kv/kv_executor.cpp @@ -61,6 +61,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest( kv_response.set_seq(kv_request.seq()); } else if (kv_request.cmd() == KVRequest::GET) { kv_response.set_value(Get(kv_request.key())); + kv_response.set_seq(kv_request.seq()); } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { kv_response.set_value(GetAllValues()); } else if (kv_request.cmd() == KVRequest::GETRANGE) { diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 12f4b49a..f70be048 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -335,7 +335,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, response->set_seq(request->seq()); if (post_exec_func_) { - post_exec_func_(std::move(request), std::move(response)); //TODO: request_tracking + post_exec_func_(std::move(request), std::move(response)); } global_stats_->IncExecuteDone(); diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 4a1aacc0..d75038aa 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -296,7 +296,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { // LOG(ERROR)<<request->data().size(); - // global_stats_->GetTransactionDetails(request->data()); + //global_stats_->GetTransactionDetails(request->data()); global_stats_->RecordStateTime("commit"); } return ret == CollectorResultCode::INVALID ? -2 : 0; diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp index 43b87541..1803dd0b 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp @@ -89,7 +89,6 @@ TEST_F(ViewChangeManagerTest, SendViewChange) { request->set_hash(SignatureVerifier::CalculateHash("test_data")); checkpoint_manager_->AddCommitData(std::move(request)); - // Wait a bit for the request to be processed (last_seq_ to be updated) std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::promise<bool> propose_done; diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 3094840b..4c0f32bf 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -26,6 +26,10 @@ cc_library( name = "stats", srcs = ["stats.cpp"], hdrs = ["stats.h"], + copts = select({ + "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"], + "//conditions:default": [], + }), deps = [ ":prometheus_handler", "//common:asio", @@ -38,6 +42,7 @@ cc_library( "//proto/kv:kv_cc_proto", "//third_party:crow", "//third_party:prometheus", + "//third_party:leveldb", ], ) diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 2d6099c4..0afb219a 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -22,8 +22,12 @@ #include <glog/logging.h> #include <ctime> +#include <sstream> #include "common/utils/utils.h" +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/write_batch.h" #include "proto/kv/kv.pb.h" namespace asio = boost::asio; @@ -124,22 +128,88 @@ void Stats::CrowRoute() { crow::SimpleApp app; while (!stop_) { try { + //Deprecated CROW_ROUTE(app, "/consensus_data") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 1"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body = consensus_history_.dump(); - res.end(); - }); + .methods("GET"_method)( + [this](const crow::request& req, crow::response& res) { + LOG(ERROR) << "API 1"; + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", + "Content-Type, Authorization"); + + int limit = 10; + int page = 1; + + std::string url = req.url; + size_t query_pos = url.find('?'); + if (query_pos != std::string::npos) { + std::string query_string = url.substr(query_pos + 1); + std::istringstream iss(query_string); + std::string param; + while (std::getline(iss, param, '&')) { + size_t eq_pos = param.find('='); + if (eq_pos != std::string::npos) { + std::string key = param.substr(0, eq_pos); + std::string value = param.substr(eq_pos + 1); + if (key == "limit") { + try { + limit = std::stoi(value); + if (limit <= 0) limit = 10; + } catch (...) { + limit = 10; + } + } else if (key == "page") { + try { + page = std::stoi(value); + if (page <= 0) page = 1; + } catch (...) { + page = 1; + } + } + } + } + } + + nlohmann::json result; + int total_count = 0; + int offset = (page - 1) * limit; + int current_index = 0; + int items_collected = 0; + + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + leveldb::Iterator* it = + summary_db_->NewIterator(leveldb::ReadOptions()); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + total_count++; + } + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + if (current_index >= offset && items_collected < limit) { + try { + result[it->key().ToString()] = + nlohmann::json::parse(it->value().ToString()); + items_collected++; + } catch (...) { + res.code = 500; + result["error"] = "Failed to parse transaction data"; + delete it; + res.body = result.dump(); + res.end(); + return; + } + } + current_index++; + } + delete it; + } + + res.body = result.dump(); + res.end(); + }); CROW_ROUTE(app, "/get_status") .methods("GET"_method)([this](const crow::request& req, crow::response& res) { @@ -167,7 +237,7 @@ void Stats::CrowRoute() { res.set_header( "Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - + res.body = "Not Enabled"; // Send your response if (enable_faulty_switch_) { @@ -211,6 +281,46 @@ void Stats::CrowRoute() { mem_view_json.clear(); res.end(); }); + CROW_ROUTE(app, "/consensus_data/<int>") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res, int txn_number) { + LOG(ERROR) << "API 5: Get transaction " << txn_number; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + nlohmann::json result; + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + std::string txn_key = std::to_string(txn_number); + std::string value; + leveldb::Status status = + summary_db_->Get(leveldb::ReadOptions(), txn_key, &value); + + if (status.ok() && !value.empty()) { + try { + result[txn_key] = nlohmann::json::parse(value); + } catch (...) { + res.code = 500; + result["error"] = "Failed to parse transaction data"; + } + } else { + res.code = 404; + result["error"] = "Transaction not found"; + result["txn_number"] = txn_number; + } + } else { + res.code = 503; + result["error"] = "Storage not initialized"; + } + + res.body = result.dump(); + res.end(); + }); app.port(8500 + transaction_summary_.port).multithreaded().run(); sleep(1); } catch (const std::exception& e) { @@ -234,6 +344,19 @@ void Stats::SetProps(int replica_id, std::string ip, int port, enable_resview = resview_flag; enable_faulty_switch_ = faulty_flag; if (resview_flag) { + std::string storage_path = "/tmp/resdb_summaries_" + std::to_string(port); + leveldb::Options options; + options.create_if_missing = true; + leveldb::DB* db = nullptr; + leveldb::Status status = leveldb::DB::Open(options, storage_path, &db); + if (status.ok()) { + summary_db_.reset(db); + LOG(INFO) << "Initialized LevelDB storage for summaries at: " + << storage_path; + } else { + LOG(ERROR) << "Failed to open LevelDB at " << storage_path << ": " + << status.ToString(); + } crow_thread_ = std::thread(&Stats::CrowRoute, this); } } @@ -345,8 +468,18 @@ void Stats::SendSummary() { summary_json_["ext_cache_hit_ratio"] = transaction_summary_.ext_cache_hit_ratio_; - consensus_history_[std::to_string(transaction_summary_.txn_number)] = - summary_json_; + + std::string txn_key = std::to_string(transaction_summary_.txn_number); + + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key, + summary_json_.dump()); + if (!status.ok()) { + LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key + << ": " << status.ToString(); + } + } LOG(ERROR) << summary_json_.dump(); @@ -423,59 +556,59 @@ void Stats::MonitorGlobal() { run_req_run_time = run_req_run_time_; LOG(INFO) << "=========== monitor =========\n" - << "server call:" << server_call - last_server_call - << " server process:" << server_process - last_server_process - << " socket recv:" << socket_recv - last_socket_recv - << " " - "client call:" - << client_call - last_client_call - << " " - "client req:" - << num_client_req - last_num_client_req - << " " - "broad_cast:" - << broad_cast_msg - last_broad_cast_msg - << " " - "send broad_cast:" - << send_broad_cast_msg - last_send_broad_cast_msg - << " " - "per send broad_cast:" - << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep - << " " - "propose:" - << num_propose - last_num_propose - << " " - "prepare:" - << (num_prepare - last_num_prepare) - << " " - "commit:" - << (num_commit - last_num_commit) - << " " - "pending execute:" - << pending_execute - last_pending_execute - << " " - "execute:" - << execute - last_execute - << " " - "execute done:" - << execute_done - last_execute_done << " seq gap:" << seq_gap - << " total request:" << total_request - last_total_request - << " txn:" << (total_request - last_total_request) / 5 - << " total geo request:" - << total_geo_request - last_total_geo_request - << " total geo request per:" - << (total_geo_request - last_total_geo_request) / 5 - << " geo request:" << (geo_request - last_geo_request) - << " " - "seq fail:" - << seq_fail - last_seq_fail << " time:" << time - << " " - "\n--------------- monitor ------------"; + << "server call:" << server_call - last_server_call + << " server process:" << server_process - last_server_process + << " socket recv:" << socket_recv - last_socket_recv + << " " + "client call:" + << client_call - last_client_call + << " " + "client req:" + << num_client_req - last_num_client_req + << " " + "broad_cast:" + << broad_cast_msg - last_broad_cast_msg + << " " + "send broad_cast:" + << send_broad_cast_msg - last_send_broad_cast_msg + << " " + "per send broad_cast:" + << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep + << " " + "propose:" + << num_propose - last_num_propose + << " " + "prepare:" + << (num_prepare - last_num_prepare) + << " " + "commit:" + << (num_commit - last_num_commit) + << " " + "pending execute:" + << pending_execute - last_pending_execute + << " " + "execute:" + << execute - last_execute + << " " + "execute done:" + << execute_done - last_execute_done << " seq gap:" << seq_gap + << " total request:" << total_request - last_total_request + << " txn:" << (total_request - last_total_request) / 5 + << " total geo request:" + << total_geo_request - last_total_geo_request + << " total geo request per:" + << (total_geo_request - last_total_geo_request) / 5 + << " geo request:" << (geo_request - last_geo_request) + << " " + "seq fail:" + << seq_fail - last_seq_fail << " time:" << time + << " " + "\n--------------- monitor ------------"; if (run_req_num - last_run_req_num > 0) { LOG(INFO) << " req client latency:" - << static_cast<double>(run_req_run_time - - last_run_req_run_time) / - (run_req_num - last_run_req_num) / 1000000000.0; + << static_cast<double>(run_req_run_time - + last_run_req_run_time) / + (run_req_num - last_run_req_num) / 1000000000.0; } last_seq_fail = seq_fail; @@ -530,8 +663,10 @@ void Stats::IncPrepare() { prometheus_->Inc(PREPARE, 1); } num_prepare_++; - transaction_summary_.prepare_message_count_times_list.push_back( - std::chrono::system_clock::now()); + if (enable_resview) { + transaction_summary_.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } } void Stats::IncCommit() { @@ -539,8 +674,10 @@ void Stats::IncCommit() { prometheus_->Inc(COMMIT, 1); } num_commit_++; - transaction_summary_.commit_message_count_times_list.push_back( - std::chrono::system_clock::now()); + if (enable_resview) { + transaction_summary_.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } } void Stats::IncPendingExecute() { pending_execute_++; } diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 849c83d1..0d86bb3e 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -33,6 +33,11 @@ #include "proto/kv/kv.pb.h" #include "sys/resource.h" +// Forward declaration for LevelDB +namespace leveldb { +class DB; +} + namespace asio = boost::asio; namespace beast = boost::beast; using tcp = asio::ip::tcp; @@ -159,9 +164,11 @@ class Stats { std::atomic<uint64_t> prev_num_prepare_; std::atomic<uint64_t> prev_num_commit_; nlohmann::json summary_json_; - nlohmann::json consensus_history_; std::unique_ptr<PrometheusHandler> prometheus_; + + std::unique_ptr<leveldb::DB> summary_db_; + std::mutex summary_db_mutex_; }; } // namespace resdb diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp index 269fb807..07b15462 100644 --- a/service/kv/kv_service.cpp +++ b/service/kv/kv_service.cpp @@ -51,7 +51,7 @@ int main(int argc, char** argv) { exit(0); } google::InitGoogleLogging(argv[0]); - FLAGS_minloglevel = 1; + FLAGS_minloglevel = 0; char* config_file = argv[1]; char* private_key_file = argv[2]; diff --git a/service/tools/config/server/server.config b/service/tools/config/server/server.config index 61c66f04..68cbee87 100644 --- a/service/tools/config/server/server.config +++ b/service/tools/config/server/server.config @@ -48,9 +48,9 @@ }, require_txn_validation:true, enable_resview:true, - enable_faulty_switch:true, - "enable_viewchange":true, - "recovery_enabled":true, - "max_client_complaint_num":10 + enable_faulty_switch:false, + enable_viewchange:false, + recovery_enabled:true, + max_client_complaint_num:10 }
