This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch demo
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 6fb02357e6934bf411156fd90a98bb7eed19d733
Author: harish876 <[email protected]>
AuthorDate: Wed Dec 31 02:08:22 2025 +0000

    Implement transaction retrieval with sequence number, integrate LevelDB for 
summary storage, and update server configuration settings.
---
 .../graphql/service/http_server/crow_service.cpp   |  28 ++
 .../graphql/service/kv_service/resdb_kv_client.cpp |  15 ++
 .../graphql/service/kv_service/resdb_kv_client.h   |  11 +-
 executor/kv/kv_executor.cpp                        |   1 +
 .../consensus/execution/transaction_executor.cpp   |   2 +-
 platform/consensus/ordering/pbft/commitment.cpp    |   2 +-
 .../ordering/pbft/viewchange_manager_test.cpp      |   1 -
 platform/statistic/BUILD                           |   5 +
 platform/statistic/stats.cpp                       | 283 +++++++++++++++------
 platform/statistic/stats.h                         |   9 +-
 service/kv/kv_service.cpp                          |   2 +-
 service/tools/config/server/server.config          |   8 +-
 12 files changed, 283 insertions(+), 84 deletions(-)

diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp 
b/ecosystem/graphql/service/http_server/crow_service.cpp
index 03c738c8..c27f44f8 100644
--- a/ecosystem/graphql/service/http_server/crow_service.cpp
+++ b/ecosystem/graphql/service/http_server/crow_service.cpp
@@ -128,6 +128,34 @@ void CrowService::run() {
     }
   });
 
+  // Get value with sequence number of specific id
+  CROW_ROUTE(app, "/v2/transactions/<string>")
+  ([this](const crow::request &req, response &res, std::string id) {
+    auto result = kv_client_.GetWithSeq(id);
+    if (result != nullptr) {
+      LOG(INFO) << "client get value with seq = " << result->second
+                << ", seq = " << result->first;
+
+      // Send updated blocks list to websocket
+      if (users.size() > 0) {
+        for (auto u : users) u->send_text("Update blocks");
+      }
+
+      num_transactions_++;
+
+      crow::json::wvalue resp;
+      resp["value"] = result->second;
+      resp["seq"] = result->first;
+
+      res.set_header("Content-Type", "application/json");
+      res.end(resp.dump());
+    } else {
+      res.code = 500;
+      res.set_header("Content-Type", "text/plain");
+      res.end("get value with seq fail");
+    }
+  });
+
   // Get values based on key range
   CROW_ROUTE(app, "/v1/transactions/<string>/<string>")
   ([this](const crow::request &req, response &res, std::string min_id,
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp 
b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
index 2273cf98..0baab27c 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
@@ -64,6 +64,21 @@ std::unique_ptr<std::string> ResDBKVClient::Get(const 
std::string &key) {
   return std::make_unique<std::string>(response.value());
 }
 
+std::unique_ptr<std::pair<int64_t, std::string>> ResDBKVClient::GetWithSeq(
+    const std::string &key) {
+  KVRequest request;
+  request.set_cmd(KVRequest::GET);
+  request.set_key(key);
+  KVResponse response;
+  int ret = SendRequest(request, &response);
+  if (ret != 0) {
+    LOG(ERROR) << "send request fail, ret:" << ret;
+    return nullptr;
+  }
+  return std::make_unique<std::pair<int64_t, std::string>>(
+      response.seq(), response.value());
+}
+
 std::unique_ptr<std::string> ResDBKVClient::GetAllValues() {
   KVRequest request;
   request.set_cmd(KVRequest::GETALLVALUES);
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h 
b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
index ac35afc0..20249b62 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
@@ -19,21 +19,28 @@
 
 #pragma once
 
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
 #include "interface/rdbc/transaction_constructor.h"
 
 namespace sdk {
 
 // ResDBKVClient to send data to the kv server.
 class ResDBKVClient : public resdb::TransactionConstructor {
-public:
+ public:
   ResDBKVClient(const resdb::ResDBConfig &config);
 
   int Set(const std::string &key, const std::string &data);
   int64_t SetWithSeq(const std::string &key, const std::string &data);
+  std::unique_ptr<std::pair<int64_t, std::string>> GetWithSeq(
+      const std::string &key);
   std::unique_ptr<std::string> Get(const std::string &key);
   std::unique_ptr<std::string> GetAllValues();
   std::unique_ptr<std::string> GetRange(const std::string &min_key,
                                         const std::string &max_key);
 };
 
-} // namespace sdk
+}  // namespace sdk
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index f6fe8b86..78ed5710 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -61,6 +61,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest(
     kv_response.set_seq(kv_request.seq());
   } else if (kv_request.cmd() == KVRequest::GET) {
     kv_response.set_value(Get(kv_request.key()));
+    kv_response.set_seq(kv_request.seq());
   } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
     kv_response.set_value(GetAllValues());
   } else if (kv_request.cmd() == KVRequest::GETRANGE) {
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 12f4b49a..f70be048 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -335,7 +335,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   response->set_seq(request->seq());
 
   if (post_exec_func_) {
-    post_exec_func_(std::move(request), std::move(response)); //TODO: 
request_tracking
+    post_exec_func_(std::move(request), std::move(response));
   }
 
   global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 4a1aacc0..d75038aa 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -296,7 +296,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
       message_manager_->AddConsensusMsg(context->signature, 
std::move(request));
   if (ret == CollectorResultCode::STATE_CHANGED) {
     // LOG(ERROR)<<request->data().size();
-    // global_stats_->GetTransactionDetails(request->data());
+    //global_stats_->GetTransactionDetails(request->data());
     global_stats_->RecordStateTime("commit");
   }
   return ret == CollectorResultCode::INVALID ? -2 : 0;
diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
index 43b87541..1803dd0b 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
@@ -89,7 +89,6 @@ TEST_F(ViewChangeManagerTest, SendViewChange) {
   request->set_hash(SignatureVerifier::CalculateHash("test_data"));
   checkpoint_manager_->AddCommitData(std::move(request));
   
-  // Wait a bit for the request to be processed (last_seq_ to be updated)
   std::this_thread::sleep_for(std::chrono::milliseconds(100));
   
   std::promise<bool> propose_done;
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 3094840b..4c0f32bf 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,6 +26,10 @@ cc_library(
     name = "stats",
     srcs = ["stats.cpp"],
     hdrs = ["stats.h"],
+    copts = select({
+        "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
+        "//conditions:default": [],
+    }),
     deps = [
         ":prometheus_handler",
         "//common:asio",
@@ -38,6 +42,7 @@ cc_library(
         "//proto/kv:kv_cc_proto",
         "//third_party:crow",
         "//third_party:prometheus",
+        "//third_party:leveldb",
     ],
 )
 
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 2d6099c4..0afb219a 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -22,8 +22,12 @@
 #include <glog/logging.h>
 
 #include <ctime>
+#include <sstream>
 
 #include "common/utils/utils.h"
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/write_batch.h"
 #include "proto/kv/kv.pb.h"
 
 namespace asio = boost::asio;
@@ -124,22 +128,88 @@ void Stats::CrowRoute() {
   crow::SimpleApp app;
   while (!stop_) {
     try {
+      //Deprecated
       CROW_ROUTE(app, "/consensus_data")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res) {
-            LOG(ERROR) << "API 1";
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            // Send your response
-            res.body = consensus_history_.dump();
-            res.end();
-          });
+          .methods("GET"_method)(
+              [this](const crow::request& req, crow::response& res) {
+                LOG(ERROR) << "API 1";
+                res.set_header("Access-Control-Allow-Origin", "*");
+                res.set_header("Access-Control-Allow-Methods",
+                               "GET, POST, OPTIONS");
+                res.set_header("Access-Control-Allow-Headers",
+                               "Content-Type, Authorization");
+
+                int limit = 10; 
+                int page = 1;
+                
+                std::string url = req.url;
+                size_t query_pos = url.find('?');
+                if (query_pos != std::string::npos) {
+                  std::string query_string = url.substr(query_pos + 1);
+                  std::istringstream iss(query_string);
+                  std::string param;
+                  while (std::getline(iss, param, '&')) {
+                    size_t eq_pos = param.find('=');
+                    if (eq_pos != std::string::npos) {
+                      std::string key = param.substr(0, eq_pos);
+                      std::string value = param.substr(eq_pos + 1);
+                      if (key == "limit") {
+                        try {
+                          limit = std::stoi(value);
+                          if (limit <= 0) limit = 10;
+                        } catch (...) {
+                          limit = 10;
+                        }
+                      } else if (key == "page") {
+                        try {
+                          page = std::stoi(value);
+                          if (page <= 0) page = 1;
+                        } catch (...) {
+                          page = 1;
+                        }
+                      }
+                    }
+                  }
+                }
+
+                nlohmann::json result;
+                int total_count = 0;
+                int offset = (page - 1) * limit;
+                int current_index = 0;
+                int items_collected = 0;
+
+                if (summary_db_) {
+                  std::lock_guard<std::mutex> lock(summary_db_mutex_);
+                  leveldb::Iterator* it =
+                      summary_db_->NewIterator(leveldb::ReadOptions());
+                  
+                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+                    total_count++;
+                  }
+                  
+                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+                    if (current_index >= offset && items_collected < limit) {
+                      try {
+                        result[it->key().ToString()] =
+                            nlohmann::json::parse(it->value().ToString());
+                        items_collected++;
+                      } catch (...) {
+                        res.code = 500;
+                        result["error"] = "Failed to parse transaction data";
+                        delete it;
+                        res.body = result.dump();
+                        res.end();
+                        return;
+                      }
+                    }
+                    current_index++;
+                  }
+                  delete it;
+                }
+
+                res.body = result.dump();
+                res.end();
+              });
       CROW_ROUTE(app, "/get_status")
           .methods("GET"_method)([this](const crow::request& req,
                                         crow::response& res) {
@@ -167,7 +237,7 @@ void Stats::CrowRoute() {
             res.set_header(
                 "Access-Control-Allow-Headers",
                 "Content-Type, Authorization");  // Specify allowed headers
-            
+
             res.body = "Not Enabled";
             // Send your response
             if (enable_faulty_switch_) {
@@ -211,6 +281,46 @@ void Stats::CrowRoute() {
             mem_view_json.clear();
             res.end();
           });
+      CROW_ROUTE(app, "/consensus_data/<int>")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res, int txn_number) {
+            LOG(ERROR) << "API 5: Get transaction " << txn_number;
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            nlohmann::json result;
+            if (summary_db_) {
+              std::lock_guard<std::mutex> lock(summary_db_mutex_);
+              std::string txn_key = std::to_string(txn_number);
+              std::string value;
+              leveldb::Status status =
+                  summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
+
+              if (status.ok() && !value.empty()) {
+                try {
+                  result[txn_key] = nlohmann::json::parse(value);
+                } catch (...) {
+                  res.code = 500;
+                  result["error"] = "Failed to parse transaction data";
+                }
+              } else {
+                res.code = 404;
+                result["error"] = "Transaction not found";
+                result["txn_number"] = txn_number;
+              }
+            } else {
+              res.code = 503;
+              result["error"] = "Storage not initialized";
+            }
+
+            res.body = result.dump();
+            res.end();
+          });
       app.port(8500 + transaction_summary_.port).multithreaded().run();
       sleep(1);
     } catch (const std::exception& e) {
@@ -234,6 +344,19 @@ void Stats::SetProps(int replica_id, std::string ip, int 
port,
   enable_resview = resview_flag;
   enable_faulty_switch_ = faulty_flag;
   if (resview_flag) {
+    std::string storage_path = "/tmp/resdb_summaries_" + std::to_string(port);
+    leveldb::Options options;
+    options.create_if_missing = true;
+    leveldb::DB* db = nullptr;
+    leveldb::Status status = leveldb::DB::Open(options, storage_path, &db);
+    if (status.ok()) {
+      summary_db_.reset(db);
+      LOG(INFO) << "Initialized LevelDB storage for summaries at: "
+                << storage_path;
+    } else {
+      LOG(ERROR) << "Failed to open LevelDB at " << storage_path << ": "
+                 << status.ToString();
+    }
     crow_thread_ = std::thread(&Stats::CrowRoute, this);
   }
 }
@@ -345,8 +468,18 @@ void Stats::SendSummary() {
 
   summary_json_["ext_cache_hit_ratio"] =
       transaction_summary_.ext_cache_hit_ratio_;
-  consensus_history_[std::to_string(transaction_summary_.txn_number)] =
-      summary_json_;
+
+  std::string txn_key = std::to_string(transaction_summary_.txn_number);
+
+  if (summary_db_) {
+    std::lock_guard<std::mutex> lock(summary_db_mutex_);
+    leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
+                                              summary_json_.dump());
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
+                 << ": " << status.ToString();
+    }
+  }
 
   LOG(ERROR) << summary_json_.dump();
 
@@ -423,59 +556,59 @@ void Stats::MonitorGlobal() {
     run_req_run_time = run_req_run_time_;
 
     LOG(INFO) << "=========== monitor =========\n"
-               << "server call:" << server_call - last_server_call
-               << " server process:" << server_process - last_server_process
-               << " socket recv:" << socket_recv - last_socket_recv
-               << " "
-                  "client call:"
-               << client_call - last_client_call
-               << " "
-                  "client req:"
-               << num_client_req - last_num_client_req
-               << " "
-                  "broad_cast:"
-               << broad_cast_msg - last_broad_cast_msg
-               << " "
-                  "send broad_cast:"
-               << send_broad_cast_msg - last_send_broad_cast_msg
-               << " "
-                  "per send broad_cast:"
-               << send_broad_cast_msg_per_rep - 
last_send_broad_cast_msg_per_rep
-               << " "
-                  "propose:"
-               << num_propose - last_num_propose
-               << " "
-                  "prepare:"
-               << (num_prepare - last_num_prepare)
-               << " "
-                  "commit:"
-               << (num_commit - last_num_commit)
-               << " "
-                  "pending execute:"
-               << pending_execute - last_pending_execute
-               << " "
-                  "execute:"
-               << execute - last_execute
-               << " "
-                  "execute done:"
-               << execute_done - last_execute_done << " seq gap:" << seq_gap
-               << " total request:" << total_request - last_total_request
-               << " txn:" << (total_request - last_total_request) / 5
-               << " total geo request:"
-               << total_geo_request - last_total_geo_request
-               << " total geo request per:"
-               << (total_geo_request - last_total_geo_request) / 5
-               << " geo request:" << (geo_request - last_geo_request)
-               << " "
-                  "seq fail:"
-               << seq_fail - last_seq_fail << " time:" << time
-               << " "
-                  "\n--------------- monitor ------------";
+              << "server call:" << server_call - last_server_call
+              << " server process:" << server_process - last_server_process
+              << " socket recv:" << socket_recv - last_socket_recv
+              << " "
+                 "client call:"
+              << client_call - last_client_call
+              << " "
+                 "client req:"
+              << num_client_req - last_num_client_req
+              << " "
+                 "broad_cast:"
+              << broad_cast_msg - last_broad_cast_msg
+              << " "
+                 "send broad_cast:"
+              << send_broad_cast_msg - last_send_broad_cast_msg
+              << " "
+                 "per send broad_cast:"
+              << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep
+              << " "
+                 "propose:"
+              << num_propose - last_num_propose
+              << " "
+                 "prepare:"
+              << (num_prepare - last_num_prepare)
+              << " "
+                 "commit:"
+              << (num_commit - last_num_commit)
+              << " "
+                 "pending execute:"
+              << pending_execute - last_pending_execute
+              << " "
+                 "execute:"
+              << execute - last_execute
+              << " "
+                 "execute done:"
+              << execute_done - last_execute_done << " seq gap:" << seq_gap
+              << " total request:" << total_request - last_total_request
+              << " txn:" << (total_request - last_total_request) / 5
+              << " total geo request:"
+              << total_geo_request - last_total_geo_request
+              << " total geo request per:"
+              << (total_geo_request - last_total_geo_request) / 5
+              << " geo request:" << (geo_request - last_geo_request)
+              << " "
+                 "seq fail:"
+              << seq_fail - last_seq_fail << " time:" << time
+              << " "
+                 "\n--------------- monitor ------------";
     if (run_req_num - last_run_req_num > 0) {
       LOG(INFO) << "  req client latency:"
-                 << static_cast<double>(run_req_run_time -
-                                        last_run_req_run_time) /
-                        (run_req_num - last_run_req_num) / 1000000000.0;
+                << static_cast<double>(run_req_run_time -
+                                       last_run_req_run_time) /
+                       (run_req_num - last_run_req_num) / 1000000000.0;
     }
 
     last_seq_fail = seq_fail;
@@ -530,8 +663,10 @@ void Stats::IncPrepare() {
     prometheus_->Inc(PREPARE, 1);
   }
   num_prepare_++;
-  transaction_summary_.prepare_message_count_times_list.push_back(
-      std::chrono::system_clock::now());
+  if (enable_resview) {
+    transaction_summary_.prepare_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
 }
 
 void Stats::IncCommit() {
@@ -539,8 +674,10 @@ void Stats::IncCommit() {
     prometheus_->Inc(COMMIT, 1);
   }
   num_commit_++;
-  transaction_summary_.commit_message_count_times_list.push_back(
-      std::chrono::system_clock::now());
+  if (enable_resview) {
+    transaction_summary_.commit_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
 }
 
 void Stats::IncPendingExecute() { pending_execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 849c83d1..0d86bb3e 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -33,6 +33,11 @@
 #include "proto/kv/kv.pb.h"
 #include "sys/resource.h"
 
+// Forward declaration for LevelDB
+namespace leveldb {
+class DB;
+}
+
 namespace asio = boost::asio;
 namespace beast = boost::beast;
 using tcp = asio::ip::tcp;
@@ -159,9 +164,11 @@ class Stats {
   std::atomic<uint64_t> prev_num_prepare_;
   std::atomic<uint64_t> prev_num_commit_;
   nlohmann::json summary_json_;
-  nlohmann::json consensus_history_;
 
   std::unique_ptr<PrometheusHandler> prometheus_;
+  
+  std::unique_ptr<leveldb::DB> summary_db_;
+  std::mutex summary_db_mutex_;
 };
 
 }  // namespace resdb
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 269fb807..07b15462 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -51,7 +51,7 @@ int main(int argc, char** argv) {
     exit(0);
   }
   google::InitGoogleLogging(argv[0]);
-  FLAGS_minloglevel = 1;
+  FLAGS_minloglevel = 0;
 
   char* config_file = argv[1];
   char* private_key_file = argv[2];
diff --git a/service/tools/config/server/server.config 
b/service/tools/config/server/server.config
index 61c66f04..68cbee87 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -48,9 +48,9 @@
   },
   require_txn_validation:true,
   enable_resview:true,
-  enable_faulty_switch:true,
-  "enable_viewchange":true,
-  "recovery_enabled":true,
-  "max_client_complaint_num":10
+  enable_faulty_switch:false,
+  enable_viewchange:false,
+  recovery_enabled:true,
+  max_client_complaint_num:10
 }
 

Reply via email to