This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new bd1f6e3f Request Tracing
bd1f6e3f is described below
commit bd1f6e3f439f0ec2f3be516675785830fb2c9406
Author: harish876 <[email protected]>
AuthorDate: Mon Dec 15 04:40:49 2025 +0000
Request Tracing
---
ecosystem/graphql/WORKSPACE | 29 +++
.../graphql/service/http_server/crow_service.cpp | 265 ++++++++++++---------
.../service/kv_service/proto/kv_server.proto | 1 +
.../graphql/service/kv_service/resdb_kv_client.cpp | 22 +-
.../graphql/service/kv_service/resdb_kv_client.h | 1 +
ecosystem/graphql/third_party/BUILD | 7 +
.../graphql/third_party/{BUILD => json.BUILD} | 23 +-
executor/common/transaction_manager.cpp | 11 +-
executor/common/transaction_manager.h | 3 +
executor/kv/kv_executor.cpp | 13 +
executor/kv/kv_executor.h | 3 +
.../consensus/execution/transaction_executor.cpp | 95 ++++----
proto/kv/kv.proto | 2 +
.../kv/server_tools/generate_keys_and_certs.sh | 2 +-
14 files changed, 285 insertions(+), 192 deletions(-)
diff --git a/ecosystem/graphql/WORKSPACE b/ecosystem/graphql/WORKSPACE
index 2ae0146f..9a8f94ce 100644
--- a/ecosystem/graphql/WORKSPACE
+++ b/ecosystem/graphql/WORKSPACE
@@ -19,6 +19,22 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "hedron_compile_commands",
+ #Replace the commit hash (4f28899228fb3ad0126897876f147ca15026151e) with
the latest commit hash from the repo
+ url =
"https://github.com/hedronvision/bazel-compile-commands-extractor/archive/4f28899228fb3ad0126897876f147ca15026151e.tar.gz",
+ strip_prefix =
"bazel-compile-commands-extractor-4f28899228fb3ad0126897876f147ca15026151e",
+)
+load("@hedron_compile_commands//:workspace_setup.bzl",
"hedron_compile_commands_setup")
+hedron_compile_commands_setup()
+load("@hedron_compile_commands//:workspace_setup_transitive.bzl",
"hedron_compile_commands_setup_transitive")
+hedron_compile_commands_setup_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl",
"hedron_compile_commands_setup_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl",
"hedron_compile_commands_setup_transitive_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive_transitive()
+
+
http_archive(
name = "rules_foreign_cc",
sha256 =
"69023642d5781c68911beda769f91fcbc8ca48711db935a75da7f6536b65047f",
@@ -238,6 +254,19 @@ http_archive(
],
)
+http_archive(
+ name = "nlohmann_json",
+ build_file = "//third_party:json.BUILD",
+ sha256 =
"95651d7d1fcf2e5c3163c3d37df6d6b3e9e5027299e6bd050d157322ceda9ac9",
+ strip_prefix = "json-3.11.2",
+ url = "https://github.com/nlohmann/json/archive/refs/tags/v3.11.2.zip",
+)
+
+bind(
+ name = "json",
+ actual = "@nlohmann_json//:json",
+)
+
load("@com_resdb_nexres//:repositories.bzl", "nexres_repositories")
nexres_repositories()
diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp
b/ecosystem/graphql/service/http_server/crow_service.cpp
index 5642ebb7..03c738c8 100644
--- a/ecosystem/graphql/service/http_server/crow_service.cpp
+++ b/ecosystem/graphql/service/http_server/crow_service.cpp
@@ -28,32 +28,36 @@
#include <sys/types.h>
#include <unistd.h>
-#include <ctime>
#include <chrono>
+#include <cstdint>
+#include <ctime>
#include <fstream>
#include <memory>
+#include <mutex>
#include <string>
#include <thread>
-
-#include <mutex>
#include <unordered_set>
using crow::request;
using crow::response;
using resdb::BatchUserRequest;
-using resdb::ResDBConfig;
-using resdb::ReplicaState;
-using resdb::ResConfigData;
+using resdb::CertificateInfo;
using resdb::KeyInfo;
using resdb::ReplicaInfo;
-using resdb::CertificateInfo;
+using resdb::ReplicaState;
+using resdb::ResConfigData;
+using resdb::ResDBConfig;
namespace sdk {
CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config,
uint16_t port_num)
- : client_config_(client_config), server_config_(server_config),
port_num_(port_num),
- kv_client_(client_config_), state_client_(client_config_),
txn_client_(server_config_) {
+ : client_config_(client_config),
+ server_config_(server_config),
+ port_num_(port_num),
+ kv_client_(client_config_),
+ state_client_(client_config_),
+ txn_client_(server_config_) {
GetAllBlocks(100, true, false);
}
@@ -67,26 +71,26 @@ void CrowService::run() {
CROW_ROUTE(app, "/v1/transactions")
([this](const crow::request &req, response &res) {
uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch()
- ).count();
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) {
res.code = 503;
res.set_header("Content-Type", "text/plain");
- res.end("Get all transactions functionality on cooldown (" +
- std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) +
- " ms left)");
+ res.end(
+ "Get all transactions functionality on cooldown (" +
+ std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) +
+ " ms left)");
} else {
last_db_scan_time = cur_time;
auto values = kv_client_.GetAllValues();
if (values != nullptr) {
- //LOG(INFO) << "client getallvalues value = " << values->c_str();
+ // LOG(INFO) << "client getallvalues value = " << values->c_str();
// Send updated blocks list to websocket
if (users.size() > 0) {
- for (auto u : users)
- u->send_text("Update blocks");
+ for (auto u : users) u->send_text("Update blocks");
}
num_transactions_++;
@@ -110,8 +114,7 @@ void CrowService::run() {
// Send updated blocks list to websocket
if (users.size() > 0) {
- for (auto u : users)
- u->send_text("Update blocks");
+ for (auto u : users) u->send_text("Update blocks");
}
num_transactions_++;
@@ -135,8 +138,7 @@ void CrowService::run() {
// Send updated blocks list to websocket
if (users.size() > 0) {
- for (auto u : users)
- u->send_text("Update blocks");
+ for (auto u : users) u->send_text("Update blocks");
}
num_transactions_++;
@@ -153,44 +155,77 @@ void CrowService::run() {
// Commit a key-value pair, extracting the id parameter from the JSON
// object and setting the value as the entire JSON object
CROW_ROUTE(app, "/v1/transactions/commit")
- .methods("POST"_method)([this](const request& req) {
- std::string body = req.body;
- LOG(INFO) << "body: " << body;
-
- // Parse transaction JSON
- rapidjson::Document doc;
- doc.Parse(body.c_str());
- if (!doc.IsObject() || !doc.HasMember("id")) {
- response res(400, "Invalid transaction format"); // Bad Request
- res.set_header("Content-Type", "text/plain");
- return res;
- }
- const std::string id = doc["id"].GetString();
- const std::string value = body;
+ .methods("POST"_method)([this](const request &req) {
+ std::string body = req.body;
+ LOG(INFO) << "body: " << body;
+
+ // Parse transaction JSON
+ rapidjson::Document doc;
+ doc.Parse(body.c_str());
+ if (!doc.IsObject() || !doc.HasMember("id")) {
+ response res(400, "Invalid transaction format"); // Bad Request
+ res.set_header("Content-Type", "text/plain");
+ return res;
+ }
+ const std::string id = doc["id"].GetString();
+ const std::string value = body;
- // Set key-value pair in kv server
- int retval = kv_client_.Set(id, value);
+ // Set key-value pair in kv server
+ int retval = kv_client_.Set(id, value);
- if (retval != 0) {
- LOG(ERROR) << "Error when trying to commit id " << id;
- response res(500, "id: " + id);
- res.set_header("Content-Type", "text/plain");
- return res;
- }
- LOG(INFO) << "Set " << id << " to " << value;
+ if (retval != 0) {
+ LOG(ERROR) << "Error when trying to commit id " << id;
+ response res(500, "id: " + id);
+ res.set_header("Content-Type", "text/plain");
+ return res;
+ }
+ LOG(INFO) << "Set " << id << " to " << value;
- // Send updated blocks list to websocket
- if (users.size() > 0) {
- for (auto u : users)
- u->send_text("Update blocks");
- }
+ // Send updated blocks list to websocket
+ if (users.size() > 0) {
+ for (auto u : users) u->send_text("Update blocks");
+ }
- num_transactions_++;
+ num_transactions_++;
- response res(201, "id: " + id); // Created status code
- res.set_header("Content-Type", "text/plain");
- return res;
- });
+ response res(201, "id: " + id); // Created status code
+ res.set_header("Content-Type", "text/plain");
+ return res;
+ });
+
+ CROW_ROUTE(app, "/v2/transactions/commit")
+ .methods("POST"_method)([this](const request &req) {
+ std::string body = req.body;
+ LOG(INFO) << "body: " << body;
+
+ // Parse transaction JSON
+ rapidjson::Document doc;
+ doc.Parse(body.c_str());
+ if (!doc.IsObject() || !doc.HasMember("id")) {
+ response res(400, "Invalid transaction format"); // Bad Request
+ res.set_header("Content-Type", "text/plain");
+ return res;
+ }
+ const std::string id = doc["id"].GetString();
+ const std::string value = body;
+
+ // Set key-value pair in kv server
+ int64_t seq_number = kv_client_.SetWithSeq(id, value);
+ crow::json::wvalue resp;
+ resp["id"] = id;
+ resp["seq"] = seq_number;
+
+ if (seq_number == static_cast<int64_t>(-1)) {
+ LOG(ERROR) << "Error when trying to commit id " << id;
+ response res(500, resp);
+ res.set_header("Content-Type", "application/json");
+ return res;
+ }
+
+ response res(201, resp);
+ res.set_header("Content-Type", "application/json");
+ return res;
+ });
CROW_ROUTE(app, "/v1/blocks")
([this](const crow::request &req, response &res) {
@@ -237,8 +272,7 @@ void CrowService::run() {
if (request.ParseFromString(txn.second)) {
LOG(INFO) << request.DebugString();
- if (!first_iteration)
- cur_batch_str.append(",");
+ if (!first_iteration) cur_batch_str.append(",");
first_iteration = false;
// id
@@ -255,13 +289,12 @@ void CrowService::run() {
kv_request.ParseFromString(sub_req.request().data());
std::string kv_request_json = ParseKVRequest(kv_request);
- if (!first_transaction)
- cur_batch_str.append(",");
+ if (!first_transaction) cur_batch_str.append(",");
first_transaction = false;
cur_batch_str.append(kv_request_json);
cur_batch_str.append("\n");
}
- cur_batch_str.append("]"); // close transactions list
+ cur_batch_str.append("]"); // close transactions list
// size
cur_batch_str.append(", \"size\": " +
@@ -300,18 +333,18 @@ void CrowService::run() {
// For metadata table on the Explorer
CROW_ROUTE(app, "/populatetable")
- ([this](const crow::request& req, response& res) {
-
+ ([this](const crow::request &req, response &res) {
absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState();
- if(!state_or.ok()){
- LOG(ERROR)<<"get state fail";
- res.set_header("Content-Type", "application/json");
- res.end("");
- return;
- }
+ if (!state_or.ok()) {
+ LOG(ERROR) << "get state fail";
+ res.set_header("Content-Type", "application/json");
+ res.end("");
+ return;
+ }
ResConfigData config_data = (*state_or).replica_config();
- ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(),
CertificateInfo());
+ ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(),
+ CertificateInfo());
uint32_t replica_num = server_config.GetReplicaNum();
uint32_t worker_num = server_config.GetWorkerNum();
@@ -322,7 +355,8 @@ void CrowService::run() {
uint32_t output_worker_num = server_config.GetOutputWorkerNum();
int client_timeout_ms = server_config.GetClientTimeoutMs();
int min_data_receive_num = server_config.GetMinDataReceiveNum();
- size_t max_malicious_replica_num =
server_config.GetMaxMaliciousReplicaNum();
+ size_t max_malicious_replica_num =
+ server_config.GetMaxMaliciousReplicaNum();
int checkpoint_water_mark = server_config.GetCheckPointWaterMark();
// Don't read the ledger if first commit time is known
@@ -336,12 +370,12 @@ void CrowService::run() {
res.end("get replica state fail");
};
- for (auto& txn : *resp) {
+ for (auto &txn : *resp) {
BatchUserRequest request;
KVRequest kv_request;
if (request.ParseFromString(txn.second)) {
// transactions
- for (auto& sub_req : request.user_requests()) {
+ for (auto &sub_req : request.user_requests()) {
kv_request.ParseFromString(sub_req.request().data());
std::string kv_request_json = ParseKVRequest(kv_request);
}
@@ -352,11 +386,12 @@ void CrowService::run() {
}
}
- uint64_t epoch_time = std::chrono::duration_cast<std::chrono::seconds>(
- std::chrono::system_clock::now().time_since_epoch()
- ).count();
- uint64_t chain_age = first_commit_time_ == 0 ?
- 0 : epoch_time - first_commit_time_;
+ uint64_t epoch_time =
+ std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ uint64_t chain_age =
+ first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_;
auto block_num_resp = txn_client_.GetBlockNumbers();
if (!block_num_resp.ok()) {
@@ -365,21 +400,23 @@ void CrowService::run() {
}
std::string values = "";
- values.append("[{ \"replicaNum\": " + std::to_string(replica_num)
- + ", \"workerNum\" : " + std::to_string(worker_num)
- + ", \"clientBatchNum\" : " +
std::to_string(client_batch_num)
- + ", \"maxProcessTxn\" : " +
std::to_string(max_process_txn)
- + ", \"clientBatchWaitTime\" : " +
std::to_string(client_batch_wait_time)
- + ", \"inputWorkerNum\" : " +
std::to_string(input_worker_num)
- + ", \"outputWorkerNum\" : " +
std::to_string(output_worker_num)
- + ", \"clientTimeoutMs\" : " +
std::to_string(client_timeout_ms)
- + ", \"minDataReceiveNum\" : " +
std::to_string(min_data_receive_num)
- + ", \"maxMaliciousReplicaNum\" : " +
std::to_string(max_malicious_replica_num)
- + ", \"checkpointWaterMark\" : " +
std::to_string(checkpoint_water_mark)
- + ", \"transactionNum\" : " +
std::to_string(num_transactions_)
- + ", \"blockNum\" : " + std::to_string(*block_num_resp)
- + ", \"chainAge\" : " + std::to_string(chain_age)
- + "}]");
+ values.append(
+ "[{ \"replicaNum\": " + std::to_string(replica_num) +
+ ", \"workerNum\" : " + std::to_string(worker_num) +
+ ", \"clientBatchNum\" : " + std::to_string(client_batch_num) +
+ ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) +
+ ", \"clientBatchWaitTime\" : " +
+ std::to_string(client_batch_wait_time) +
+ ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) +
+ ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) +
+ ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) +
+ ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) +
+ ", \"maxMaliciousReplicaNum\" : " +
+ std::to_string(max_malicious_replica_num) +
+ ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark)
+
+ ", \"transactionNum\" : " + std::to_string(num_transactions_) +
+ ", \"blockNum\" : " + std::to_string(*block_num_resp) +
+ ", \"chainAge\" : " + std::to_string(chain_age) + "}]");
LOG(INFO) << std::string(values.c_str());
res.set_header("Content-Type", "application/json");
res.end(std::string(values.c_str()));
@@ -400,10 +437,8 @@ std::string CrowService::GetAllBlocks(int batch_size, bool
increment_txn_count,
bool first_batch = true;
while (full_batches) {
std::string cur_batch_str = "";
- if (!first_batch)
- cur_batch_str.append(",\n");
- if (batch_size > 1 && make_sublists)
- cur_batch_str.append("[");
+ if (!first_batch) cur_batch_str.append(",\n");
+ if (batch_size > 1 && make_sublists) cur_batch_str.append("[");
first_batch = false;
int max_seq = min_seq + batch_size - 1;
@@ -423,7 +458,7 @@ std::string CrowService::GetAllBlocks(int batch_size, bool
increment_txn_count,
cur_size++;
if (request.ParseFromString(txn.second)) {
if (!first_batch_element) cur_batch_str.append(",");
-
+
first_batch_element = false;
// id
@@ -440,15 +475,14 @@ std::string CrowService::GetAllBlocks(int batch_size,
bool increment_txn_count,
kv_request.ParseFromString(sub_req.request().data());
std::string kv_request_json = ParseKVRequest(kv_request);
- if (!first_transaction)
- cur_batch_str.append(",");
+ if (!first_transaction) cur_batch_str.append(",");
first_transaction = false;
cur_batch_str.append(kv_request_json);
cur_batch_str.append("\n");
if (increment_txn_count) num_transactions_++;
}
- cur_batch_str.append("]"); // close transactions list
+ cur_batch_str.append("]"); // close transactions list
// size
cur_batch_str.append(", \"size\": " +
@@ -462,11 +496,9 @@ std::string CrowService::GetAllBlocks(int batch_size, bool
increment_txn_count,
cur_batch_str.append("}\n");
}
full_batches = cur_size == batch_size;
- if (batch_size > 1 && make_sublists)
- cur_batch_str.append("]");
+ if (batch_size > 1 && make_sublists) cur_batch_str.append("]");
- if (cur_size > 0)
- values.append(cur_batch_str);
+ if (cur_size > 0) values.append(cur_batch_str);
min_seq += batch_size;
}
@@ -479,25 +511,25 @@ std::string CrowService::GetAllBlocks(int batch_size,
bool increment_txn_count,
// Helper function used by the blocks endpoints to create JSON strings
std::string CrowService::ParseKVRequest(const KVRequest &kv_request) {
rapidjson::Document doc;
- if (kv_request.cmd() == 1) { // SET
+ if (kv_request.cmd() == 1) { // SET
doc.SetObject();
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
rapidjson::Value val(rapidjson::kObjectType);
doc.AddMember("cmd", "SET", allocator);
doc.AddMember("key", kv_request.key(), allocator);
doc.AddMember("value", kv_request.value(), allocator);
- } else if (kv_request.cmd() == 2) { // GET
+ } else if (kv_request.cmd() == 2) { // GET
doc.SetObject();
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
rapidjson::Value val(rapidjson::kObjectType);
doc.AddMember("cmd", "GET", allocator);
doc.AddMember("key", kv_request.key(), allocator);
- } else if (kv_request.cmd() == 3) { // GETALLVALUES
+ } else if (kv_request.cmd() == 3) { // GETALLVALUES
doc.SetObject();
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
rapidjson::Value val(rapidjson::kObjectType);
doc.AddMember("cmd", "GETALLVALUES", allocator);
- } else if (kv_request.cmd() == 4) { // GETRANGE
+ } else if (kv_request.cmd() == 4) { // GETRANGE
doc.SetObject();
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator();
rapidjson::Value val(rapidjson::kObjectType);
@@ -514,19 +546,19 @@ std::string CrowService::ParseKVRequest(const KVRequest
&kv_request) {
std::string CrowService::ParseCreateTime(uint64_t createtime) {
std::string timestr = "";
- uint64_t sec = createtime / 1000000; // see
resilientdb/common/utils/utils.cpp
+ uint64_t sec =
+ createtime / 1000000; // see resilientdb/common/utils/utils.cpp
std::tm *tm_gmt = std::gmtime((time_t *)&sec);
int year = tm_gmt->tm_year + 1900;
- int month = tm_gmt->tm_mon; // 0-indexed
+ int month = tm_gmt->tm_mon; // 0-indexed
int day = tm_gmt->tm_mday;
std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
// Using date time string format to support the Explorer transaction chart
- if (day < 10)
- timestr += "0";
+ if (day < 10) timestr += "0";
timestr += std::to_string(day) + " " + months[month] + " " +
std::to_string(year) + " ";
@@ -534,16 +566,13 @@ std::string CrowService::ParseCreateTime(uint64_t
createtime) {
std::string min_str = std::to_string(tm_gmt->tm_min);
std::string sec_str = std::to_string(tm_gmt->tm_sec);
- if (tm_gmt->tm_hour < 10)
- hour_str = "0" + hour_str;
- if (tm_gmt->tm_min < 10)
- min_str = "0" + min_str;
- if (tm_gmt->tm_sec < 10)
- sec_str = "0" + sec_str;
+ if (tm_gmt->tm_hour < 10) hour_str = "0" + hour_str;
+ if (tm_gmt->tm_min < 10) min_str = "0" + min_str;
+ if (tm_gmt->tm_sec < 10) sec_str = "0" + sec_str;
timestr += hour_str + ":" + min_str + ":" + sec_str + " GMT";
return timestr;
}
-} // namespace sdk
+} // namespace sdk
diff --git a/ecosystem/graphql/service/kv_service/proto/kv_server.proto
b/ecosystem/graphql/service/kv_service/proto/kv_server.proto
index 238a009e..f461f1e0 100644
--- a/ecosystem/graphql/service/kv_service/proto/kv_server.proto
+++ b/ecosystem/graphql/service/kv_service/proto/kv_server.proto
@@ -40,5 +40,6 @@ message KVRequest {
message KVResponse {
string key = 1;
bytes value = 2;
+ uint64 seq = 5;
}
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
index 15edcdd5..2273cf98 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp
@@ -36,6 +36,21 @@ int ResDBKVClient::Set(const std::string &key, const
std::string &data) {
return SendRequest(request);
}
+int64_t ResDBKVClient::SetWithSeq(const std::string &key,
+ const std::string &data) {
+ KVRequest request;
+ request.set_cmd(KVRequest::SET);
+ request.set_key(key);
+ request.set_value(data);
+ KVResponse response;
+ int ret = SendRequest(request, &response);
+ if (ret != 0) {
+ LOG(ERROR) << "send request fail, ret:" << ret;
+ return -1;
+ }
+ return response.seq();
+}
+
std::unique_ptr<std::string> ResDBKVClient::Get(const std::string &key) {
KVRequest request;
request.set_cmd(KVRequest::GET);
@@ -61,9 +76,8 @@ std::unique_ptr<std::string> ResDBKVClient::GetAllValues() {
return std::make_unique<std::string>(response.value());
}
-std::unique_ptr<std::string>
-ResDBKVClient::GetRange(const std::string &min_key,
- const std::string &max_key) {
+std::unique_ptr<std::string> ResDBKVClient::GetRange(
+ const std::string &min_key, const std::string &max_key) {
KVRequest request;
request.set_cmd(KVRequest::GETRANGE);
request.set_key(min_key);
@@ -77,4 +91,4 @@ ResDBKVClient::GetRange(const std::string &min_key,
return std::make_unique<std::string>(response.value());
}
-} // namespace sdk
+} // namespace sdk
diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
index e08b41a5..ac35afc0 100644
--- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h
+++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h
@@ -29,6 +29,7 @@ public:
ResDBKVClient(const resdb::ResDBConfig &config);
int Set(const std::string &key, const std::string &data);
+ int64_t SetWithSeq(const std::string &key, const std::string &data);
std::unique_ptr<std::string> Get(const std::string &key);
std::unique_ptr<std::string> GetAllValues();
std::unique_ptr<std::string> GetRange(const std::string &min_key,
diff --git a/ecosystem/graphql/third_party/BUILD
b/ecosystem/graphql/third_party/BUILD
index 25da0d1f..3e6e468e 100644
--- a/ecosystem/graphql/third_party/BUILD
+++ b/ecosystem/graphql/third_party/BUILD
@@ -35,3 +35,10 @@ cc_library(
"@rapidjson",
],
)
+
+cc_library(
+ name = "json",
+ deps = [
+ "@nlohmann_json//:json",
+ ],
+)
\ No newline at end of file
diff --git a/ecosystem/graphql/third_party/BUILD
b/ecosystem/graphql/third_party/json.BUILD
similarity index 78%
copy from ecosystem/graphql/third_party/BUILD
copy to ecosystem/graphql/third_party/json.BUILD
index 25da0d1f..f0fd8e0b 100644
--- a/ecosystem/graphql/third_party/BUILD
+++ b/ecosystem/graphql/third_party/json.BUILD
@@ -16,22 +16,15 @@
# specific language governing permissions and limitations
# under the License.
#
-#
-
-package(default_visibility = ["//visibility:public"])
-load("@rules_foreign_cc//foreign_cc:defs.bzl", "configure_make", "make")
+licenses(["notice"])
+exports_files(["LICENSE"])
-cc_library(
- name = "crow",
- deps = [
- "@com_crowcpp_crow//:crow",
- ],
-)
+package(default_visibility = ["//visibility:public"])
cc_library(
- name = "rapidjson",
- deps = [
- "@rapidjson",
- ],
-)
+ name = "json",
+ hdrs = glob(["single_include/nlohmann/*.hpp"]),
+ includes = ["single_include"],
+ visibility = ["//visibility:public"],
+)
\ No newline at end of file
diff --git a/executor/common/transaction_manager.cpp
b/executor/common/transaction_manager.cpp
index 74df05c9..fa28f6d0 100644
--- a/executor/common/transaction_manager.cpp
+++ b/executor/common/transaction_manager.cpp
@@ -20,6 +20,7 @@
#include "executor/common/transaction_manager.h"
#include <glog/logging.h>
+#include <sys/types.h>
namespace resdb {
@@ -40,6 +41,11 @@ std::unique_ptr<google::protobuf::Message>
TransactionManager::ParseData(
return nullptr;
}
+std::unique_ptr<google::protobuf::Message> TransactionManager::ParseData(
+ const std::string& data,uint64_t seq) {
+ return nullptr;
+}
+
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
TransactionManager::Prepare(const BatchUserRequest& request) {
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
@@ -48,17 +54,17 @@ TransactionManager::Prepare(const BatchUserRequest&
request) {
{
for (auto& sub_request : request.user_requests()) {
std::unique_ptr<google::protobuf::Message> response =
- ParseData(sub_request.request().data());
+ ParseData(sub_request.request().data(), request.seq());
batch_response->push_back(std::move(response));
}
// LOG(ERROR)<<"prepare data size:"<<batch_response.size();
}
-
return batch_response;
}
std::unique_ptr<std::string> TransactionManager::ExecuteRequest(
const google::protobuf::Message& request) {
+ LOG(ERROR) << "At TransactionManger::ExecuteRequest" << std::endl;
return nullptr;
}
@@ -78,7 +84,6 @@ std::vector<std::unique_ptr<std::string>>
TransactionManager::ExecuteBatchData(
return ret;
}
-
std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatch(
const BatchUserRequest& request) {
std::unique_ptr<BatchUserResponse> batch_response =
diff --git a/executor/common/transaction_manager.h
b/executor/common/transaction_manager.h
index b1d564ac..5ea58718 100644
--- a/executor/common/transaction_manager.h
+++ b/executor/common/transaction_manager.h
@@ -56,8 +56,11 @@ class TransactionManager {
protected:
virtual std::unique_ptr<google::protobuf::Message> ParseData(
const std::string& data);
+ virtual std::unique_ptr<google::protobuf::Message> ParseData(
+ const std::string& data,uint64_t seq);
virtual std::unique_ptr<std::string> ExecuteRequest(
const google::protobuf::Message& request);
+
private:
bool is_out_of_order_ = false;
bool need_response_ = true;
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index c587f592..f6fe8b86 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -36,6 +36,18 @@ std::unique_ptr<google::protobuf::Message>
KVExecutor::ParseData(
LOG(ERROR) << "parse data fail";
return nullptr;
}
+ kv_request->set_seq(0);
+ return kv_request;
+}
+
+std::unique_ptr<google::protobuf::Message> KVExecutor::ParseData(
+ const std::string& request, uint64_t seq) {
+ std::unique_ptr<KVRequest> kv_request = std::make_unique<KVRequest>();
+ if (!kv_request->ParseFromString(request)) {
+ LOG(ERROR) << "parse data fail";
+ return nullptr;
+ }
+ kv_request->set_seq(seq);
return kv_request;
}
@@ -46,6 +58,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest(
if (kv_request.cmd() == KVRequest::SET) {
Set(kv_request.key(), kv_request.value());
+ kv_response.set_seq(kv_request.seq());
} else if (kv_request.cmd() == KVRequest::GET) {
kv_response.set_value(Get(kv_request.key()));
} else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index fef12597..19ee9c32 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -19,6 +19,7 @@
#pragma once
+#include <sys/types.h>
#include <map>
#include <optional>
#include <unordered_map>
@@ -38,6 +39,8 @@ class KVExecutor : public TransactionManager {
std::unique_ptr<google::protobuf::Message> ParseData(
const std::string& request) override;
+ std::unique_ptr<google::protobuf::Message> ParseData(
+ const std::string& request,uint64_t seq) override;
std::unique_ptr<std::string> ExecuteRequest(
const google::protobuf::Message& kv_request) override;
protected:
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index a62e55f5..12f4b49a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,6 +20,7 @@
#include "platform/consensus/execution/transaction_executor.h"
#include <glog/logging.h>
+
#include "common/utils/utils.h"
namespace resdb {
@@ -36,7 +37,6 @@ TransactionExecutor::TransactionExecutor(
execute_queue_("execute"),
stop_(false),
duplicate_manager_(nullptr) {
-
memset(blucket_, 0, sizeof(blucket_));
global_stats_ = Stats::GetGlobalStats();
ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
@@ -198,9 +198,9 @@ void TransactionExecutor::OrderMessage() {
}
void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
- global_stats_->IncCommit();
- message->set_commit_time(GetCurrentTime());
- execute_queue_.Push(std::move(message));
+ global_stats_->IncCommit();
+ message->set_commit_time(GetCurrentTime());
+ execute_queue_.Push(std::move(message));
}
void TransactionExecutor::ExecuteMessage() {
@@ -244,7 +244,7 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// << batch_request.user_requests_size()<<" proxy
// id:"<<request->proxy_id();
std::unique_ptr<BatchUserResponse> response;
- global_stats_->GetTransactionDetails(batch_request);
+ global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
@@ -258,7 +258,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
RegisterExecute(request->seq());
std::unique_ptr<BatchUserRequest> batch_request = nullptr;
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
data;
- std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+ std::vector<std::unique_ptr<google::protobuf::Message>>* data_p = nullptr;
BatchUserRequest* batch_request_p = nullptr;
// Execute the request, then send the response back to the user.
@@ -276,7 +276,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
batch_request_p = batch_request.get();
// LOG(ERROR)<<"get data from req:";
} else {
- assert(batch_request_p);
+ assert(batch_request_p);
batch_request_p->set_seq(request->seq());
batch_request_p->set_proxy_id(request->proxy_id());
// LOG(ERROR)<<" get from cache:"<<uid;
@@ -295,38 +295,32 @@ void
TransactionExecutor::Execute(std::unique_ptr<Request> request,
} else {
std::vector<std::unique_ptr<std::string>> response_v;
- if(data_p == nullptr) {
- int64_t start_time = GetCurrentTime();
- data = std::move(transaction_manager_->Prepare(*batch_request_p));
- int64_t end_time = GetCurrentTime();
- if (end_time - start_time > 10) {
- // LOG(ERROR)<<"exec data done:"<<uid<<" wait
- // time:"<<(end_time-start_time);
- }
+ if (data_p == nullptr) {
+ data = transaction_manager_->Prepare(*batch_request_p);
data_p = data.get();
}
WaitForExecute(request->seq());
- if(data_p->empty() || (*data_p)[0] == nullptr){
- response =
transaction_manager_->ExecuteBatch(*batch_request_p);
- }
- else {
- response_v =
transaction_manager_->ExecuteBatchData(*data_p);
- }
+ if (data_p->empty() || (*data_p)[0] == nullptr) {
+ response = transaction_manager_->ExecuteBatch(*batch_request_p);
+ } else {
+ response_v = transaction_manager_->ExecuteBatchData(*data_p);
+ }
FinishExecute(request->seq());
- if(response == nullptr){
- response = std::make_unique<BatchUserResponse>();
- for (auto& s : response_v) {
- response->add_response()->swap(*s);
- }
+ if (response == nullptr) {
+ response = std::make_unique<BatchUserResponse>();
+ for (auto& s : response_v) {
+ response->add_response()->swap(*s);
+ }
}
}
}
// LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
- if (duplicate_manager_ && batch_request_p) {
- duplicate_manager_->AddExecuted(batch_request_p->hash(),
batch_request_p->seq());
+ if (duplicate_manager_ && batch_request_p) {
+ duplicate_manager_->AddExecuted(batch_request_p->hash(),
+ batch_request_p->seq());
}
if (response == nullptr) {
@@ -334,13 +328,14 @@ void
TransactionExecutor::Execute(std::unique_ptr<Request> request,
}
global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
response->set_proxy_id(batch_request_p->proxy_id());
- response->set_createtime(batch_request_p->createtime() +
request->queuing_time());
+ response->set_createtime(batch_request_p->createtime() +
+ request->queuing_time());
response->set_local_id(batch_request_p->local_id());
response->set_seq(request->seq());
if (post_exec_func_) {
- post_exec_func_(std::move(request), std::move(response));
+ post_exec_func_(std::move(request), std::move(response)); //TODO:
request_tracking
}
global_stats_->IncExecuteDone();
@@ -350,7 +345,6 @@ void
TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
-
bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = flag_[uid % mod].find(uid);
@@ -364,9 +358,9 @@ bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
if (flag_[uid % mod][uid] & Start_Execute) {
return false;
}
- } else if(f == Start_Execute){
+ } else if (f == Start_Execute) {
if (flag_[uid % mod][uid] & End_Prepare) {
- //if (flag_[uid % mod][uid] & Start_Prepare) {
+ // if (flag_[uid % mod][uid] & Start_Prepare) {
return false;
}
}
@@ -383,10 +377,10 @@ void TransactionExecutor::ClearPromise(uint64_t uid) {
// LOG(ERROR)<<"CLEAR UID:"<<uid;
assert(it != pre_[uid % mod].end());
assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
- //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
- //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
- //data_[uid%mod].erase(data_[uid%mod].find(uid));
- //req_[uid%mod].erase(req_[uid%mod].find(uid));
+ // assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+ // assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+ // data_[uid%mod].erase(data_[uid%mod].find(uid));
+ // req_[uid%mod].erase(req_[uid%mod].find(uid));
pre_[uid % mod].erase(it);
flag_[uid % mod].erase(flag_[uid % mod].find(uid));
}
@@ -406,8 +400,8 @@ std::unique_ptr<std::future<int>>
TransactionExecutor::GetFuture(uint64_t uid) {
if (it == pre_[uid % mod].end()) {
return nullptr;
}
- //return std::move(it->second);
- // LOG(ERROR)<<"add future:"<<uid;
+ // return std::move(it->second);
+ // LOG(ERROR)<<"add future:"<<uid;
return std::make_unique<std::future<int>>(it->second->get_future());
}
@@ -418,9 +412,9 @@ bool TransactionExecutor::AddFuture(uint64_t uid) {
// LOG(ERROR)<<"add future:"<<uid;
std::unique_ptr<std::promise<int>> p =
std::make_unique<std::promise<int>>();
- //auto f = std::make_unique<std::future<int>>(p->get_future());
+ // auto f = std::make_unique<std::future<int>>(p->get_future());
pre_[uid % mod][uid] = std::move(p);
- //pre_f_[uid % mod][uid] = std::move(f);
+ // pre_f_[uid % mod][uid] = std::move(f);
flag_[uid % mod][uid] = 0;
return true;
}
@@ -445,13 +439,13 @@ void TransactionExecutor::PrepareMessage() {
if (current_f == 0) {
// commit has done
// LOG(ERROR)<<" want prepare, commit started:"<<uid;
-// ClearPromise(uid);
+ // ClearPromise(uid);
continue;
}
- std::promise<int>* p = GetPromise(uid) ;
+ std::promise<int>* p = GetPromise(uid);
assert(p);
- //LOG(ERROR)<<" prepare started:"<<uid;
+ // LOG(ERROR)<<" prepare started:"<<uid;
// LOG(ERROR)<<" prepare uid:"<<uid;
@@ -473,27 +467,26 @@ void TransactionExecutor::PrepareMessage() {
// id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
- request_v = transaction_manager_->Prepare(*batch_request);
+ request_v = transaction_manager_->Prepare(*batch_request);
{
std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
- // assert(request_v);
+ // assert(request_v);
// assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
- data_[uid%mod][uid] = std::move(request_v);
+ data_[uid % mod][uid] = std::move(request_v);
req_[uid % mod][uid] = std::move(batch_request);
}
- //LOG(ERROR)<<"set promise:"<<uid;
+ // LOG(ERROR)<<"set promise:"<<uid;
p->set_value(1);
{
int set_ret = SetFlag(uid, End_Prepare);
if (set_ret == 0) {
// LOG(ERROR)<<"commit interrupt:"<<uid;
- //ClearPromise(uid);
+ // ClearPromise(uid);
} else {
- //LOG(ERROR)<<"prepare done:"<<uid;
+ // LOG(ERROR)<<"prepare done:"<<uid;
}
}
}
}
-
} // namespace resdb
diff --git a/proto/kv/kv.proto b/proto/kv/kv.proto
index 750058e7..d24b5fbd 100644
--- a/proto/kv/kv.proto
+++ b/proto/kv/kv.proto
@@ -48,6 +48,7 @@ message KVRequest {
// For top history
int32 top_number = 9;
bytes smart_contract_request = 10;
+ uint64 seq = 11;
}
message ValueInfo {
@@ -69,6 +70,7 @@ message KVResponse {
bytes value = 2;
ValueInfo value_info = 3;
Items items = 4;
+ uint64 seq = 5;
bytes smart_contract_response = 10;
}
diff --git a/service/tools/kv/server_tools/generate_keys_and_certs.sh
b/service/tools/kv/server_tools/generate_keys_and_certs.sh
index 94fb4aab..261bbb79 100755
--- a/service/tools/kv/server_tools/generate_keys_and_certs.sh
+++ b/service/tools/kv/server_tools/generate_keys_and_certs.sh
@@ -31,4 +31,4 @@ CONFIG_PATH=$PWD/service/tools/config/
PORT_BASE=10000
CLIENT_NUM=1
-./service/tools/config/generate_keys_and_certs.sh ${WORKSPACE} ${CERT_PATH}
${CERT_PATH} ${CONFIG_PATH} ${CERT_PATH} ${CLIENT_NUM} ${PORT_BASE}
${iplist[@]}
+./service/tools/config/generate_keys_and_certs.sh ${WORKSPACE} ${CERT_PATH}
${CERT_PATH} ${PORT_BASE} ${CLIENT_NUM} ${iplist[@]}