This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/development by this push:
     new bd1f6e3f Request Tracing
bd1f6e3f is described below

commit bd1f6e3f439f0ec2f3be516675785830fb2c9406
Author: harish876 <[email protected]>
AuthorDate: Mon Dec 15 04:40:49 2025 +0000

    Request Tracing
---
 ecosystem/graphql/WORKSPACE                        |  29 +++
 .../graphql/service/http_server/crow_service.cpp   | 265 ++++++++++++---------
 .../service/kv_service/proto/kv_server.proto       |   1 +
 .../graphql/service/kv_service/resdb_kv_client.cpp |  22 +-
 .../graphql/service/kv_service/resdb_kv_client.h   |   1 +
 ecosystem/graphql/third_party/BUILD                |   7 +
 .../graphql/third_party/{BUILD => json.BUILD}      |  23 +-
 executor/common/transaction_manager.cpp            |  11 +-
 executor/common/transaction_manager.h              |   3 +
 executor/kv/kv_executor.cpp                        |  13 +
 executor/kv/kv_executor.h                          |   3 +
 .../consensus/execution/transaction_executor.cpp   |  95 ++++----
 proto/kv/kv.proto                                  |   2 +
 .../kv/server_tools/generate_keys_and_certs.sh     |   2 +-
 14 files changed, 285 insertions(+), 192 deletions(-)

diff --git a/ecosystem/graphql/WORKSPACE b/ecosystem/graphql/WORKSPACE
index 2ae0146f..9a8f94ce 100644
--- a/ecosystem/graphql/WORKSPACE
+++ b/ecosystem/graphql/WORKSPACE
@@ -19,6 +19,22 @@
 
 load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
 
+http_archive(
+    name = "hedron_compile_commands",
+    #Replace the commit hash (4f28899228fb3ad0126897876f147ca15026151e) with 
the latest commit hash from the repo
+    url = 
"https://github.com/hedronvision/bazel-compile-commands-extractor/archive/4f28899228fb3ad0126897876f147ca15026151e.tar.gz";,
+    strip_prefix = 
"bazel-compile-commands-extractor-4f28899228fb3ad0126897876f147ca15026151e",
+)
+load("@hedron_compile_commands//:workspace_setup.bzl", 
"hedron_compile_commands_setup")
+hedron_compile_commands_setup()
+load("@hedron_compile_commands//:workspace_setup_transitive.bzl", 
"hedron_compile_commands_setup_transitive")
+hedron_compile_commands_setup_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", 
"hedron_compile_commands_setup_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl",
 "hedron_compile_commands_setup_transitive_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive_transitive()
+
+
 http_archive(
     name = "rules_foreign_cc",
     sha256 = 
"69023642d5781c68911beda769f91fcbc8ca48711db935a75da7f6536b65047f",
@@ -238,6 +254,19 @@ http_archive(
     ],
 )
 
+http_archive(
+    name = "nlohmann_json",
+    build_file = "//third_party:json.BUILD",
+    sha256 = 
"95651d7d1fcf2e5c3163c3d37df6d6b3e9e5027299e6bd050d157322ceda9ac9",
+    strip_prefix = "json-3.11.2",
+    url = "https://github.com/nlohmann/json/archive/refs/tags/v3.11.2.zip";,
+)
+
+bind(
+    name = "json",
+    actual = "@nlohmann_json//:json",
+)
+
 load("@com_resdb_nexres//:repositories.bzl", "nexres_repositories")
 
 nexres_repositories()
diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp 
b/ecosystem/graphql/service/http_server/crow_service.cpp
index 5642ebb7..03c738c8 100644
--- a/ecosystem/graphql/service/http_server/crow_service.cpp
+++ b/ecosystem/graphql/service/http_server/crow_service.cpp
@@ -28,32 +28,36 @@
 #include <sys/types.h>
 #include <unistd.h>
 
-#include <ctime>
 #include <chrono>
+#include <cstdint>
+#include <ctime>
 #include <fstream>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <thread>
-
-#include <mutex>
 #include <unordered_set>
 
 using crow::request;
 using crow::response;
 using resdb::BatchUserRequest;
-using resdb::ResDBConfig;
-using resdb::ReplicaState;
-using resdb::ResConfigData;
+using resdb::CertificateInfo;
 using resdb::KeyInfo;
 using resdb::ReplicaInfo;
-using resdb::CertificateInfo;
+using resdb::ReplicaState;
+using resdb::ResConfigData;
+using resdb::ResDBConfig;
 
 namespace sdk {
 
 CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config,
                          uint16_t port_num)
-    : client_config_(client_config), server_config_(server_config), 
port_num_(port_num),
-      kv_client_(client_config_), state_client_(client_config_), 
txn_client_(server_config_) {
+    : client_config_(client_config),
+      server_config_(server_config),
+      port_num_(port_num),
+      kv_client_(client_config_),
+      state_client_(client_config_),
+      txn_client_(server_config_) {
   GetAllBlocks(100, true, false);
 }
 
@@ -67,26 +71,26 @@ void CrowService::run() {
   CROW_ROUTE(app, "/v1/transactions")
   ([this](const crow::request &req, response &res) {
     uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
-      std::chrono::system_clock::now().time_since_epoch()
-    ).count();
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
 
     if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) {
       res.code = 503;
       res.set_header("Content-Type", "text/plain");
-      res.end("Get all transactions functionality on cooldown (" + 
-        std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS -  cur_time) +
-        " ms left)");
+      res.end(
+          "Get all transactions functionality on cooldown (" +
+          std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) +
+          " ms left)");
     } else {
       last_db_scan_time = cur_time;
 
       auto values = kv_client_.GetAllValues();
       if (values != nullptr) {
-        //LOG(INFO) << "client getallvalues value = " << values->c_str();
+        // LOG(INFO) << "client getallvalues value = " << values->c_str();
 
         // Send updated blocks list to websocket
         if (users.size() > 0) {
-          for (auto u : users)
-            u->send_text("Update blocks");
+          for (auto u : users) u->send_text("Update blocks");
         }
 
         num_transactions_++;
@@ -110,8 +114,7 @@ void CrowService::run() {
 
       // Send updated blocks list to websocket
       if (users.size() > 0) {
-        for (auto u : users)
-          u->send_text("Update blocks");
+        for (auto u : users) u->send_text("Update blocks");
       }
 
       num_transactions_++;
@@ -135,8 +138,7 @@ void CrowService::run() {
 
       // Send updated blocks list to websocket
       if (users.size() > 0) {
-        for (auto u : users)
-          u->send_text("Update blocks");
+        for (auto u : users) u->send_text("Update blocks");
       }
 
       num_transactions_++;
@@ -153,44 +155,77 @@ void CrowService::run() {
   // Commit a key-value pair, extracting the id parameter from the JSON
   // object and setting the value as the entire JSON object
   CROW_ROUTE(app, "/v1/transactions/commit")
-  .methods("POST"_method)([this](const request& req) {
-    std::string body = req.body;
-    LOG(INFO) << "body: " << body;
-
-    // Parse transaction JSON
-    rapidjson::Document doc;
-    doc.Parse(body.c_str());
-    if (!doc.IsObject() || !doc.HasMember("id")) {
-      response res(400, "Invalid transaction format"); // Bad Request
-      res.set_header("Content-Type", "text/plain");
-      return res;
-    }
-    const std::string id = doc["id"].GetString();
-    const std::string value = body;
+      .methods("POST"_method)([this](const request &req) {
+        std::string body = req.body;
+        LOG(INFO) << "body: " << body;
+
+        // Parse transaction JSON
+        rapidjson::Document doc;
+        doc.Parse(body.c_str());
+        if (!doc.IsObject() || !doc.HasMember("id")) {
+          response res(400, "Invalid transaction format");  // Bad Request
+          res.set_header("Content-Type", "text/plain");
+          return res;
+        }
+        const std::string id = doc["id"].GetString();
+        const std::string value = body;
 
-    // Set key-value pair in kv server
-    int retval = kv_client_.Set(id, value);
+        // Set key-value pair in kv server
+        int retval = kv_client_.Set(id, value);
 
-    if (retval != 0) {
-      LOG(ERROR) << "Error when trying to commit id " << id;
-      response res(500, "id: " + id);
-      res.set_header("Content-Type", "text/plain");
-      return res;
-    }
-    LOG(INFO) << "Set " << id << " to " << value;
+        if (retval != 0) {
+          LOG(ERROR) << "Error when trying to commit id " << id;
+          response res(500, "id: " + id);
+          res.set_header("Content-Type", "text/plain");
+          return res;
+        }
+        LOG(INFO) << "Set " << id << " to " << value;
 
-    // Send updated blocks list to websocket
-    if (users.size() > 0) {
-      for (auto u : users)
-        u->send_text("Update blocks");
-    }
+        // Send updated blocks list to websocket
+        if (users.size() > 0) {
+          for (auto u : users) u->send_text("Update blocks");
+        }
 
-    num_transactions_++;
+        num_transactions_++;
 
-    response res(201, "id: " + id);  // Created status code
-    res.set_header("Content-Type", "text/plain");
-    return res;
-  });
+        response res(201, "id: " + id);  // Created status code
+        res.set_header("Content-Type", "text/plain");
+        return res;
+      });
+
+  CROW_ROUTE(app, "/v2/transactions/commit")
+      .methods("POST"_method)([this](const request &req) {
+        std::string body = req.body;
+        LOG(INFO) << "body: " << body;
+
+        // Parse transaction JSON
+        rapidjson::Document doc;
+        doc.Parse(body.c_str());
+        if (!doc.IsObject() || !doc.HasMember("id")) {
+          response res(400, "Invalid transaction format");  // Bad Request
+          res.set_header("Content-Type", "text/plain");
+          return res;
+        }
+        const std::string id = doc["id"].GetString();
+        const std::string value = body;
+
+        // Set key-value pair in kv server
+        int64_t seq_number = kv_client_.SetWithSeq(id, value);
+        crow::json::wvalue resp;
+        resp["id"] = id;
+        resp["seq"] = seq_number;
+
+        if (seq_number == static_cast<int64_t>(-1)) {
+          LOG(ERROR) << "Error when trying to commit id " << id;
+          response res(500, resp);
+          res.set_header("Content-Type", "application/json");
+          return res;
+        }
+
+        response res(201, resp);
+        res.set_header("Content-Type", "application/json");
+        return res;
+      });
 
   CROW_ROUTE(app, "/v1/blocks")
   ([this](const crow::request &req, response &res) {
@@ -237,8 +272,7 @@ void CrowService::run() {
       if (request.ParseFromString(txn.second)) {
         LOG(INFO) << request.DebugString();
 
-        if (!first_iteration)
-          cur_batch_str.append(",");
+        if (!first_iteration) cur_batch_str.append(",");
         first_iteration = false;
 
         // id
@@ -255,13 +289,12 @@ void CrowService::run() {
           kv_request.ParseFromString(sub_req.request().data());
           std::string kv_request_json = ParseKVRequest(kv_request);
 
-          if (!first_transaction)
-            cur_batch_str.append(",");
+          if (!first_transaction) cur_batch_str.append(",");
           first_transaction = false;
           cur_batch_str.append(kv_request_json);
           cur_batch_str.append("\n");
         }
-        cur_batch_str.append("]"); // close transactions list
+        cur_batch_str.append("]");  // close transactions list
 
         // size
         cur_batch_str.append(", \"size\": " +
@@ -300,18 +333,18 @@ void CrowService::run() {
 
   // For metadata table on the Explorer
   CROW_ROUTE(app, "/populatetable")
-  ([this](const crow::request& req, response& res) {
-
+  ([this](const crow::request &req, response &res) {
     absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState();
-       if(!state_or.ok()){
-                 LOG(ERROR)<<"get state fail";
-               res.set_header("Content-Type", "application/json");
-               res.end("");
-               return;
-       }
+    if (!state_or.ok()) {
+      LOG(ERROR) << "get state fail";
+      res.set_header("Content-Type", "application/json");
+      res.end("");
+      return;
+    }
 
     ResConfigData config_data = (*state_or).replica_config();
-    ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), 
CertificateInfo());
+    ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(),
+                              CertificateInfo());
 
     uint32_t replica_num = server_config.GetReplicaNum();
     uint32_t worker_num = server_config.GetWorkerNum();
@@ -322,7 +355,8 @@ void CrowService::run() {
     uint32_t output_worker_num = server_config.GetOutputWorkerNum();
     int client_timeout_ms = server_config.GetClientTimeoutMs();
     int min_data_receive_num = server_config.GetMinDataReceiveNum();
-    size_t max_malicious_replica_num = 
server_config.GetMaxMaliciousReplicaNum();
+    size_t max_malicious_replica_num =
+        server_config.GetMaxMaliciousReplicaNum();
     int checkpoint_water_mark = server_config.GetCheckPointWaterMark();
 
     // Don't read the ledger if first commit time is known
@@ -336,12 +370,12 @@ void CrowService::run() {
         res.end("get replica state fail");
       };
 
-      for (auto& txn : *resp) {
+      for (auto &txn : *resp) {
         BatchUserRequest request;
         KVRequest kv_request;
         if (request.ParseFromString(txn.second)) {
           // transactions
-          for (auto& sub_req : request.user_requests()) {
+          for (auto &sub_req : request.user_requests()) {
             kv_request.ParseFromString(sub_req.request().data());
             std::string kv_request_json = ParseKVRequest(kv_request);
           }
@@ -352,11 +386,12 @@ void CrowService::run() {
       }
     }
 
-    uint64_t epoch_time = std::chrono::duration_cast<std::chrono::seconds>(
-      std::chrono::system_clock::now().time_since_epoch()
-    ).count();
-    uint64_t chain_age = first_commit_time_ == 0 ? 
-                          0 : epoch_time - first_commit_time_;
+    uint64_t epoch_time =
+        std::chrono::duration_cast<std::chrono::seconds>(
+            std::chrono::system_clock::now().time_since_epoch())
+            .count();
+    uint64_t chain_age =
+        first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_;
 
     auto block_num_resp = txn_client_.GetBlockNumbers();
     if (!block_num_resp.ok()) {
@@ -365,21 +400,23 @@ void CrowService::run() {
     }
 
     std::string values = "";
-    values.append("[{ \"replicaNum\": " + std::to_string(replica_num)
-                      + ", \"workerNum\" : " + std::to_string(worker_num) 
-                      + ", \"clientBatchNum\" : " + 
std::to_string(client_batch_num)
-                      + ", \"maxProcessTxn\" : " + 
std::to_string(max_process_txn) 
-                      + ", \"clientBatchWaitTime\" : " + 
std::to_string(client_batch_wait_time)
-                      + ", \"inputWorkerNum\" : " + 
std::to_string(input_worker_num)
-                      + ", \"outputWorkerNum\" : " + 
std::to_string(output_worker_num)
-                      + ", \"clientTimeoutMs\" : " + 
std::to_string(client_timeout_ms)
-                      + ", \"minDataReceiveNum\" : " + 
std::to_string(min_data_receive_num)
-                      + ", \"maxMaliciousReplicaNum\" : " + 
std::to_string(max_malicious_replica_num)
-                      + ", \"checkpointWaterMark\" : " + 
std::to_string(checkpoint_water_mark)
-                      + ", \"transactionNum\" : " + 
std::to_string(num_transactions_)
-                      + ", \"blockNum\" : " + std::to_string(*block_num_resp)
-                      + ", \"chainAge\" : " + std::to_string(chain_age)
-                      + "}]");
+    values.append(
+        "[{ \"replicaNum\": " + std::to_string(replica_num) +
+        ", \"workerNum\" : " + std::to_string(worker_num) +
+        ", \"clientBatchNum\" : " + std::to_string(client_batch_num) +
+        ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) +
+        ", \"clientBatchWaitTime\" : " +
+        std::to_string(client_batch_wait_time) +
+        ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) +
+        ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) +
+        ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) +
+        ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) +
+        ", \"maxMaliciousReplicaNum\" : " +
+        std::to_string(max_malicious_replica_num) +
+        ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) 
+
+        ", \"transactionNum\" : " + std::to_string(num_transactions_) +
+        ", \"blockNum\" : " + std::to_string(*block_num_resp) +
+        ", \"chainAge\" : " + std::to_string(chain_age) + "}]");
     LOG(INFO) << std::string(values.c_str());
     res.set_header("Content-Type", "application/json");
     res.end(std::string(values.c_str()));
@@ -400,10 +437,8 @@ std::string CrowService::GetAllBlocks(int batch_size, bool 
increment_txn_count,
   bool first_batch = true;
   while (full_batches) {
     std::string cur_batch_str = "";
-    if (!first_batch)
-      cur_batch_str.append(",\n");
-    if (batch_size > 1 && make_sublists)
-      cur_batch_str.append("[");
+    if (!first_batch) cur_batch_str.append(",\n");
+    if (batch_size > 1 && make_sublists) cur_batch_str.append("[");
     first_batch = false;
 
     int max_seq = min_seq + batch_size - 1;
@@ -423,7 +458,7 @@ std::string CrowService::GetAllBlocks(int batch_size, bool 
increment_txn_count,
       cur_size++;
       if (request.ParseFromString(txn.second)) {
         if (!first_batch_element) cur_batch_str.append(",");
-        
+
         first_batch_element = false;
 
         // id
@@ -440,15 +475,14 @@ std::string CrowService::GetAllBlocks(int batch_size, 
bool increment_txn_count,
           kv_request.ParseFromString(sub_req.request().data());
           std::string kv_request_json = ParseKVRequest(kv_request);
 
-          if (!first_transaction)
-            cur_batch_str.append(",");
+          if (!first_transaction) cur_batch_str.append(",");
           first_transaction = false;
           cur_batch_str.append(kv_request_json);
           cur_batch_str.append("\n");
 
           if (increment_txn_count) num_transactions_++;
         }
-        cur_batch_str.append("]"); // close transactions list
+        cur_batch_str.append("]");  // close transactions list
 
         // size
         cur_batch_str.append(", \"size\": " +
@@ -462,11 +496,9 @@ std::string CrowService::GetAllBlocks(int batch_size, bool 
increment_txn_count,
       cur_batch_str.append("}\n");
     }
     full_batches = cur_size == batch_size;
-    if (batch_size > 1 && make_sublists)
-      cur_batch_str.append("]");
+    if (batch_size > 1 && make_sublists) cur_batch_str.append("]");
 
-    if (cur_size > 0)
-      values.append(cur_batch_str);
+    if (cur_size > 0) values.append(cur_batch_str);
 
     min_seq += batch_size;
   }
@@ -479,25 +511,25 @@ std::string CrowService::GetAllBlocks(int batch_size, 
bool increment_txn_count,
 // Helper function used by the blocks endpoints to create JSON strings
 std::string CrowService::ParseKVRequest(const KVRequest &kv_request) {
   rapidjson::Document doc;
-  if (kv_request.cmd() == 1) { // SET
+  if (kv_request.cmd() == 1) {  // SET
     doc.SetObject();
     rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
     rapidjson::Value val(rapidjson::kObjectType);
     doc.AddMember("cmd", "SET", allocator);
     doc.AddMember("key", kv_request.key(), allocator);
     doc.AddMember("value", kv_request.value(), allocator);
-  } else if (kv_request.cmd() == 2) { // GET
+  } else if (kv_request.cmd() == 2) {  // GET
     doc.SetObject();
     rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
     rapidjson::Value val(rapidjson::kObjectType);
     doc.AddMember("cmd", "GET", allocator);
     doc.AddMember("key", kv_request.key(), allocator);
-  } else if (kv_request.cmd() == 3) { // GETALLVALUES
+  } else if (kv_request.cmd() == 3) {  // GETALLVALUES
     doc.SetObject();
     rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
     rapidjson::Value val(rapidjson::kObjectType);
     doc.AddMember("cmd", "GETALLVALUES", allocator);
-  } else if (kv_request.cmd() == 4) { // GETRANGE
+  } else if (kv_request.cmd() == 4) {  // GETRANGE
     doc.SetObject();
     rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
     rapidjson::Value val(rapidjson::kObjectType);
@@ -514,19 +546,19 @@ std::string CrowService::ParseKVRequest(const KVRequest 
&kv_request) {
 
 std::string CrowService::ParseCreateTime(uint64_t createtime) {
   std::string timestr = "";
-  uint64_t sec = createtime / 1000000; // see 
resilientdb/common/utils/utils.cpp
+  uint64_t sec =
+      createtime / 1000000;  // see resilientdb/common/utils/utils.cpp
 
   std::tm *tm_gmt = std::gmtime((time_t *)&sec);
   int year = tm_gmt->tm_year + 1900;
-  int month = tm_gmt->tm_mon; // 0-indexed
+  int month = tm_gmt->tm_mon;  // 0-indexed
   int day = tm_gmt->tm_mday;
 
   std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun",
                             "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
 
   // Using date time string format to support the Explorer transaction chart
-  if (day < 10)
-    timestr += "0";
+  if (day < 10) timestr += "0";
   timestr += std::to_string(day) + " " + months[month] + " " +
              std::to_string(year) + " ";
 
@@ -534,16 +566,13 @@ std::string CrowService::ParseCreateTime(uint64_t 
createtime) {
   std::string min_str = std::to_string(tm_gmt->tm_min);
   std::string sec_str = std::to_string(tm_gmt->tm_sec);
 
-  if (tm_gmt->tm_hour < 10)
-    hour_str = "0" + hour_str;
-  if (tm_gmt->tm_min < 10)
-    min_str = "0" + min_str;
-  if (tm_gmt->tm_sec < 10)
-    sec_str = "0" + sec_str;
+  if (tm_gmt->tm_hour < 10) hour_str = "0" + hour_str;
+  if (tm_gmt->tm_min < 10) min_str = "0" + min_str;
+  if (tm_gmt->tm_sec < 10) sec_str = "0" + sec_str;
 
   timestr += hour_str + ":" + min_str + ":" + sec_str + " GMT";
 
   return timestr;
 }
 
-} // namespace sdk
+}  // namespace sdk
diff --git a/ecosystem/graphql/service/kv_service/proto/kv_server.proto 
b/ecosystem/graphql/service/kv_service/proto/kv_server.proto
index 238a009e..f461f1e0 100644
--- a/ecosystem/graphql/service/kv_service/proto/kv_server.proto
+++ b/ecosystem/graphql/service/kv_service/proto/kv_server.proto
@@ -40,5 +40,6 @@ message KVRequest {
 message KVResponse {
     string key = 1;
     bytes value = 2;
+    uint64 seq = 5;
 }
 
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp 
b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
index 15edcdd5..2273cf98 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
@@ -36,6 +36,21 @@ int ResDBKVClient::Set(const std::string &key, const 
std::string &data) {
   return SendRequest(request);
 }
 
+int64_t ResDBKVClient::SetWithSeq(const std::string &key,
+                                   const std::string &data) {
+  KVRequest request;
+  request.set_cmd(KVRequest::SET);
+  request.set_key(key);
+  request.set_value(data);
+  KVResponse response;
+  int ret = SendRequest(request, &response);
+  if (ret != 0) {
+    LOG(ERROR) << "send request fail, ret:" << ret;
+    return -1;
+  }
+  return response.seq();
+}
+
 std::unique_ptr<std::string> ResDBKVClient::Get(const std::string &key) {
   KVRequest request;
   request.set_cmd(KVRequest::GET);
@@ -61,9 +76,8 @@ std::unique_ptr<std::string> ResDBKVClient::GetAllValues() {
   return std::make_unique<std::string>(response.value());
 }
 
-std::unique_ptr<std::string>
-ResDBKVClient::GetRange(const std::string &min_key,
-                        const std::string &max_key) {
+std::unique_ptr<std::string> ResDBKVClient::GetRange(
+    const std::string &min_key, const std::string &max_key) {
   KVRequest request;
   request.set_cmd(KVRequest::GETRANGE);
   request.set_key(min_key);
@@ -77,4 +91,4 @@ ResDBKVClient::GetRange(const std::string &min_key,
   return std::make_unique<std::string>(response.value());
 }
 
-} // namespace sdk
+}  // namespace sdk
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h 
b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
index e08b41a5..ac35afc0 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
@@ -29,6 +29,7 @@ public:
   ResDBKVClient(const resdb::ResDBConfig &config);
 
   int Set(const std::string &key, const std::string &data);
+  int64_t SetWithSeq(const std::string &key, const std::string &data);
   std::unique_ptr<std::string> Get(const std::string &key);
   std::unique_ptr<std::string> GetAllValues();
   std::unique_ptr<std::string> GetRange(const std::string &min_key,
diff --git a/ecosystem/graphql/third_party/BUILD 
b/ecosystem/graphql/third_party/BUILD
index 25da0d1f..3e6e468e 100644
--- a/ecosystem/graphql/third_party/BUILD
+++ b/ecosystem/graphql/third_party/BUILD
@@ -35,3 +35,10 @@ cc_library(
         "@rapidjson",
     ],
 )
+
+cc_library(
+    name = "json",
+    deps = [
+        "@nlohmann_json//:json",
+    ],
+)
\ No newline at end of file
diff --git a/ecosystem/graphql/third_party/BUILD 
b/ecosystem/graphql/third_party/json.BUILD
similarity index 78%
copy from ecosystem/graphql/third_party/BUILD
copy to ecosystem/graphql/third_party/json.BUILD
index 25da0d1f..f0fd8e0b 100644
--- a/ecosystem/graphql/third_party/BUILD
+++ b/ecosystem/graphql/third_party/json.BUILD
@@ -16,22 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-#
-
-package(default_visibility = ["//visibility:public"])
 
-load("@rules_foreign_cc//foreign_cc:defs.bzl", "configure_make", "make")
+licenses(["notice"])
+exports_files(["LICENSE"])
 
-cc_library(
-    name = "crow",
-    deps = [
-        "@com_crowcpp_crow//:crow",
-    ],
-)
+package(default_visibility = ["//visibility:public"])
 
 cc_library(
-    name = "rapidjson",
-    deps = [
-        "@rapidjson",
-    ],
-)
+    name = "json",
+    hdrs = glob(["single_include/nlohmann/*.hpp"]),
+    includes = ["single_include"],
+    visibility = ["//visibility:public"],
+)
\ No newline at end of file
diff --git a/executor/common/transaction_manager.cpp 
b/executor/common/transaction_manager.cpp
index 74df05c9..fa28f6d0 100644
--- a/executor/common/transaction_manager.cpp
+++ b/executor/common/transaction_manager.cpp
@@ -20,6 +20,7 @@
 #include "executor/common/transaction_manager.h"
 
 #include <glog/logging.h>
+#include <sys/types.h>
 
 namespace resdb {
 
@@ -40,6 +41,11 @@ std::unique_ptr<google::protobuf::Message> 
TransactionManager::ParseData(
   return nullptr;
 }
 
+std::unique_ptr<google::protobuf::Message> TransactionManager::ParseData(
+    const std::string& data,uint64_t seq) {
+  return nullptr;
+}
+
 std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
 TransactionManager::Prepare(const BatchUserRequest& request) {
   std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
@@ -48,17 +54,17 @@ TransactionManager::Prepare(const BatchUserRequest& 
request) {
   {
     for (auto& sub_request : request.user_requests()) {
       std::unique_ptr<google::protobuf::Message> response =
-          ParseData(sub_request.request().data());
+          ParseData(sub_request.request().data(), request.seq());
       batch_response->push_back(std::move(response));
     }
     // LOG(ERROR)<<"prepare data size:"<<batch_response.size();
   }
-
   return batch_response;
 }
 
 std::unique_ptr<std::string> TransactionManager::ExecuteRequest(
     const google::protobuf::Message& request) {
+  LOG(ERROR) << "At TransactionManger::ExecuteRequest" << std::endl;
   return nullptr;
 }
 
@@ -78,7 +84,6 @@ std::vector<std::unique_ptr<std::string>> 
TransactionManager::ExecuteBatchData(
   return ret;
 }
 
-
 std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatch(
     const BatchUserRequest& request) {
   std::unique_ptr<BatchUserResponse> batch_response =
diff --git a/executor/common/transaction_manager.h 
b/executor/common/transaction_manager.h
index b1d564ac..5ea58718 100644
--- a/executor/common/transaction_manager.h
+++ b/executor/common/transaction_manager.h
@@ -56,8 +56,11 @@ class TransactionManager {
  protected:
   virtual std::unique_ptr<google::protobuf::Message> ParseData(
       const std::string& data);
+  virtual std::unique_ptr<google::protobuf::Message> ParseData(
+      const std::string& data,uint64_t seq);
   virtual std::unique_ptr<std::string> ExecuteRequest(
       const google::protobuf::Message& request);
+
  private:
   bool is_out_of_order_ = false;
   bool need_response_ = true;
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index c587f592..f6fe8b86 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -36,6 +36,18 @@ std::unique_ptr<google::protobuf::Message> 
KVExecutor::ParseData(
     LOG(ERROR) << "parse data fail";
     return nullptr;
   }
+  kv_request->set_seq(0);
+  return kv_request;
+}
+
+std::unique_ptr<google::protobuf::Message> KVExecutor::ParseData(
+    const std::string& request, uint64_t seq) {
+  std::unique_ptr<KVRequest> kv_request = std::make_unique<KVRequest>();
+  if (!kv_request->ParseFromString(request)) {
+    LOG(ERROR) << "parse data fail";
+    return nullptr;
+  }
+  kv_request->set_seq(seq);
   return kv_request;
 }
 
@@ -46,6 +58,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest(
 
   if (kv_request.cmd() == KVRequest::SET) {
     Set(kv_request.key(), kv_request.value());
+    kv_response.set_seq(kv_request.seq());
   } else if (kv_request.cmd() == KVRequest::GET) {
     kv_response.set_value(Get(kv_request.key()));
   } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index fef12597..19ee9c32 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <sys/types.h>
 #include <map>
 #include <optional>
 #include <unordered_map>
@@ -38,6 +39,8 @@ class KVExecutor : public TransactionManager {
 
   std::unique_ptr<google::protobuf::Message> ParseData(
       const std::string& request) override;
+  std::unique_ptr<google::protobuf::Message> ParseData(
+      const std::string& request,uint64_t seq) override;
   std::unique_ptr<std::string> ExecuteRequest(
       const google::protobuf::Message& kv_request) override;
  protected:
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index a62e55f5..12f4b49a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,6 +20,7 @@
 #include "platform/consensus/execution/transaction_executor.h"
 
 #include <glog/logging.h>
+
 #include "common/utils/utils.h"
 
 namespace resdb {
@@ -36,7 +37,6 @@ TransactionExecutor::TransactionExecutor(
       execute_queue_("execute"),
       stop_(false),
       duplicate_manager_(nullptr) {
-
   memset(blucket_, 0, sizeof(blucket_));
   global_stats_ = Stats::GetGlobalStats();
   ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
@@ -198,9 +198,9 @@ void TransactionExecutor::OrderMessage() {
 }
 
 void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
-    global_stats_->IncCommit();
-    message->set_commit_time(GetCurrentTime());
-    execute_queue_.Push(std::move(message));
+  global_stats_->IncCommit();
+  message->set_commit_time(GetCurrentTime());
+  execute_queue_.Push(std::move(message));
 }
 
 void TransactionExecutor::ExecuteMessage() {
@@ -244,7 +244,7 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
   //          << batch_request.user_requests_size()<<" proxy
   //          id:"<<request->proxy_id();
   std::unique_ptr<BatchUserResponse> response;
-   global_stats_->GetTransactionDetails(batch_request);
+  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_) {
     response = transaction_manager_->ExecuteBatch(batch_request);
   }
@@ -258,7 +258,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   RegisterExecute(request->seq());
   std::unique_ptr<BatchUserRequest> batch_request = nullptr;
   std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> 
data;
-  std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+  std::vector<std::unique_ptr<google::protobuf::Message>>* data_p = nullptr;
   BatchUserRequest* batch_request_p = nullptr;
 
   // Execute the request, then send the response back to the user.
@@ -276,7 +276,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
     batch_request_p = batch_request.get();
     // LOG(ERROR)<<"get data from req:";
   } else {
-  assert(batch_request_p);
+    assert(batch_request_p);
     batch_request_p->set_seq(request->seq());
     batch_request_p->set_proxy_id(request->proxy_id());
     // LOG(ERROR)<<" get from cache:"<<uid;
@@ -295,38 +295,32 @@ void 
TransactionExecutor::Execute(std::unique_ptr<Request> request,
     } else {
       std::vector<std::unique_ptr<std::string>> response_v;
 
-      if(data_p == nullptr) {
-        int64_t start_time = GetCurrentTime();
-        data = std::move(transaction_manager_->Prepare(*batch_request_p));
-        int64_t end_time = GetCurrentTime();
-        if (end_time - start_time > 10) {
-          // LOG(ERROR)<<"exec data done:"<<uid<<" wait
-          // time:"<<(end_time-start_time);
-        }
+      if (data_p == nullptr) {
+        data = transaction_manager_->Prepare(*batch_request_p);
         data_p = data.get();
       }
 
       WaitForExecute(request->seq());
-           if(data_p->empty() || (*data_p)[0] == nullptr){
-                   response = 
transaction_manager_->ExecuteBatch(*batch_request_p);
-           }
-           else {
-                   response_v = 
transaction_manager_->ExecuteBatchData(*data_p);
-           }
+      if (data_p->empty() || (*data_p)[0] == nullptr) {
+        response = transaction_manager_->ExecuteBatch(*batch_request_p);
+      } else {
+        response_v = transaction_manager_->ExecuteBatchData(*data_p);
+      }
       FinishExecute(request->seq());
 
-      if(response == nullptr){
-             response = std::make_unique<BatchUserResponse>();
-             for (auto& s : response_v) {
-                     response->add_response()->swap(*s);
-             }
+      if (response == nullptr) {
+        response = std::make_unique<BatchUserResponse>();
+        for (auto& s : response_v) {
+          response->add_response()->swap(*s);
+        }
       }
     }
   }
   // LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
 
-  if (duplicate_manager_ && batch_request_p) {       
-    duplicate_manager_->AddExecuted(batch_request_p->hash(), 
batch_request_p->seq());              
+  if (duplicate_manager_ && batch_request_p) {
+    duplicate_manager_->AddExecuted(batch_request_p->hash(),
+                                    batch_request_p->seq());
   }
 
   if (response == nullptr) {
@@ -334,13 +328,14 @@ void 
TransactionExecutor::Execute(std::unique_ptr<Request> request,
   }
   global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
   response->set_proxy_id(batch_request_p->proxy_id());
-  response->set_createtime(batch_request_p->createtime() + 
request->queuing_time());
+  response->set_createtime(batch_request_p->createtime() +
+                           request->queuing_time());
   response->set_local_id(batch_request_p->local_id());
 
   response->set_seq(request->seq());
 
   if (post_exec_func_) {
-    post_exec_func_(std::move(request), std::move(response));
+    post_exec_func_(std::move(request), std::move(response)); //TODO: 
request_tracking
   }
 
   global_stats_->IncExecuteDone();
@@ -350,7 +345,6 @@ void 
TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
   duplicate_manager_ = manager;
 }
 
-
 bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
   std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
   auto it = flag_[uid % mod].find(uid);
@@ -364,9 +358,9 @@ bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
     if (flag_[uid % mod][uid] & Start_Execute) {
       return false;
     }
-  } else if(f == Start_Execute){
+  } else if (f == Start_Execute) {
     if (flag_[uid % mod][uid] & End_Prepare) {
-    //if (flag_[uid % mod][uid] & Start_Prepare) {
+      // if (flag_[uid % mod][uid] & Start_Prepare) {
       return false;
     }
   }
@@ -383,10 +377,10 @@ void TransactionExecutor::ClearPromise(uint64_t uid) {
   // LOG(ERROR)<<"CLEAR UID:"<<uid;
   assert(it != pre_[uid % mod].end());
   assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
-  //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
-  //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
-  //data_[uid%mod].erase(data_[uid%mod].find(uid));
-  //req_[uid%mod].erase(req_[uid%mod].find(uid));
+  // assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+  // assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+  // data_[uid%mod].erase(data_[uid%mod].find(uid));
+  // req_[uid%mod].erase(req_[uid%mod].find(uid));
   pre_[uid % mod].erase(it);
   flag_[uid % mod].erase(flag_[uid % mod].find(uid));
 }
@@ -406,8 +400,8 @@ std::unique_ptr<std::future<int>> 
TransactionExecutor::GetFuture(uint64_t uid) {
   if (it == pre_[uid % mod].end()) {
     return nullptr;
   }
-  //return std::move(it->second);
-  // LOG(ERROR)<<"add future:"<<uid;
+  // return std::move(it->second);
+  //  LOG(ERROR)<<"add future:"<<uid;
   return std::make_unique<std::future<int>>(it->second->get_future());
 }
 
@@ -418,9 +412,9 @@ bool TransactionExecutor::AddFuture(uint64_t uid) {
     // LOG(ERROR)<<"add future:"<<uid;
     std::unique_ptr<std::promise<int>> p =
         std::make_unique<std::promise<int>>();
-    //auto f = std::make_unique<std::future<int>>(p->get_future());
+    // auto f = std::make_unique<std::future<int>>(p->get_future());
     pre_[uid % mod][uid] = std::move(p);
-    //pre_f_[uid % mod][uid] = std::move(f);
+    // pre_f_[uid % mod][uid] = std::move(f);
     flag_[uid % mod][uid] = 0;
     return true;
   }
@@ -445,13 +439,13 @@ void TransactionExecutor::PrepareMessage() {
     if (current_f == 0) {
       // commit has done
       // LOG(ERROR)<<" want prepare, commit started:"<<uid;
-//      ClearPromise(uid);
+      //      ClearPromise(uid);
       continue;
     }
 
-    std::promise<int>* p = GetPromise(uid) ;
+    std::promise<int>* p = GetPromise(uid);
     assert(p);
-    //LOG(ERROR)<<" prepare started:"<<uid;
+    // LOG(ERROR)<<" prepare started:"<<uid;
 
     // LOG(ERROR)<<" prepare uid:"<<uid;
 
@@ -473,27 +467,26 @@ void TransactionExecutor::PrepareMessage() {
     // id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
 
     std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
-     request_v = transaction_manager_->Prepare(*batch_request);
+        request_v = transaction_manager_->Prepare(*batch_request);
     {
       std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
-   //   assert(request_v);
+      //   assert(request_v);
       // assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
-      data_[uid%mod][uid] = std::move(request_v);
+      data_[uid % mod][uid] = std::move(request_v);
       req_[uid % mod][uid] = std::move(batch_request);
     }
-    //LOG(ERROR)<<"set promise:"<<uid;
+    // LOG(ERROR)<<"set promise:"<<uid;
     p->set_value(1);
     {
       int set_ret = SetFlag(uid, End_Prepare);
       if (set_ret == 0) {
         // LOG(ERROR)<<"commit interrupt:"<<uid;
-        //ClearPromise(uid);
+        // ClearPromise(uid);
       } else {
-        //LOG(ERROR)<<"prepare done:"<<uid;
+        // LOG(ERROR)<<"prepare done:"<<uid;
       }
     }
   }
 }
 
-
 }  // namespace resdb
diff --git a/proto/kv/kv.proto b/proto/kv/kv.proto
index 750058e7..d24b5fbd 100644
--- a/proto/kv/kv.proto
+++ b/proto/kv/kv.proto
@@ -48,6 +48,7 @@ message KVRequest {
     // For top history
     int32 top_number = 9;
     bytes smart_contract_request = 10;
+    uint64 seq = 11;
 }
 
 message ValueInfo {
@@ -69,6 +70,7 @@ message KVResponse {
     bytes value = 2;
     ValueInfo value_info = 3;
     Items items = 4;
+    uint64 seq = 5;
     bytes smart_contract_response = 10;
 }
 
diff --git a/service/tools/kv/server_tools/generate_keys_and_certs.sh 
b/service/tools/kv/server_tools/generate_keys_and_certs.sh
index 94fb4aab..261bbb79 100755
--- a/service/tools/kv/server_tools/generate_keys_and_certs.sh
+++ b/service/tools/kv/server_tools/generate_keys_and_certs.sh
@@ -31,4 +31,4 @@ CONFIG_PATH=$PWD/service/tools/config/
 PORT_BASE=10000
 CLIENT_NUM=1
 
-./service/tools/config/generate_keys_and_certs.sh ${WORKSPACE} ${CERT_PATH} 
${CERT_PATH} ${CONFIG_PATH} ${CERT_PATH} ${CLIENT_NUM} ${PORT_BASE} 
${iplist[@]} 
+./service/tools/config/generate_keys_and_certs.sh ${WORKSPACE} ${CERT_PATH} 
${CERT_PATH} ${PORT_BASE} ${CLIENT_NUM} ${iplist[@]} 


Reply via email to