This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 9a49bece0d80f3843004a10fef8e463b6c9bec4f Author: bchou9 <[email protected]> AuthorDate: Wed Jan 7 19:24:54 2026 +0000 Enhance KV service with sequence tracking and update Bazel configurations - Introduced sequence number tracking in KV responses and requests. - Added new methods in ResDBKVClient for setting and getting values with sequence numbers. - Updated CrowService to handle new transaction routes for v2, including error handling. - Modified Bazel configurations to include profiling options and set profile as default. - Updated service configuration files for consistency in replica information. --- .bazelrc | 5 + ecosystem/graphql/.bazelrc | 10 +- .../graphql/service/http_server/crow_service.cpp | 249 +++++++++++++-------- .../service/kv_service/proto/kv_server.proto | 1 + .../graphql/service/kv_service/resdb_kv_client.cpp | 31 +++ .../graphql/service/kv_service/resdb_kv_client.h | 3 + executor/kv/kv_executor.cpp | 2 + proto/kv/kv.proto | 1 + service/tools/config/interface/service.config | 2 +- 9 files changed, 214 insertions(+), 90 deletions(-) diff --git a/.bazelrc b/.bazelrc index ecfcecae..3331ef4b 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,6 +1,11 @@ +# Default build configuration (optimized) build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 +# Profile build configuration (debug symbols, no optimization) build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 + +# Apply profile config by default +build --config=profile #build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" #build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" diff --git a/ecosystem/graphql/.bazelrc b/ecosystem/graphql/.bazelrc index 3d06b30e..3331ef4b 100644 --- a/ecosystem/graphql/.bazelrc +++ b/ecosystem/graphql/.bazelrc @@ -1,3 +1,11 @@ +# Default build configuration (optimized) build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 -build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 \ No newline at end of file +# Profile build configuration (debug symbols, no optimization) +build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 + +# Apply profile config by default +build --config=profile +#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" +#build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" + diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp b/ecosystem/graphql/service/http_server/crow_service.cpp index b45cd273..b354ae9b 100644 --- a/ecosystem/graphql/service/http_server/crow_service.cpp +++ b/ecosystem/graphql/service/http_server/crow_service.cpp @@ -30,6 +30,7 @@ #include <chrono> #include <ctime> +#include <exception> #include <fstream> #include <memory> #include <mutex> @@ -55,8 +56,7 @@ CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config, server_config_(server_config), port_num_(port_num), kv_client_(client_config_), - state_client_(client_config_) { -} + state_client_(client_config_) {} void CrowService::run() { crow::SimpleApp app; @@ -67,7 +67,8 @@ void CrowService::run() { // Get all values // CROW_ROUTE(app, "/v1/transactions") // ([this](const crow::request &req, response &res) { - // uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( + // uint64_t cur_time = + // std::chrono::duration_cast<std::chrono::milliseconds>( // std::chrono::system_clock::now().time_since_epoch()) // .count(); @@ -125,6 +126,35 @@ void CrowService::run() { } }); + CROW_ROUTE(app, "/v2/transactions/<string>") + ([this](const crow::request &req, std::string id) { + try { + auto value = kv_client_.GetValueWithSeq(id); + if (value != nullptr) { + LOG(INFO) << "client get value = " << value->first.c_str(); + crow::json::wvalue resp; + resp["value"] = value->first.c_str(); + resp["seq"] = value->second; + + response res(201, resp); + res.set_header("Content-Type", "application/json"); + return res; + } else { + crow::json::wvalue resp; + resp["error"] = "value not found"; + response res(404, resp); + res.set_header("Content-Type", "application/json"); + return res; + } + } catch (...) { + crow::json::wvalue resp; + resp["error"] = "get value fail"; + response res(500, resp); + res.set_header("Content-Type", "application/json"); + return res; + } + }); + // Get values based on key range CROW_ROUTE(app, "/v1/transactions/<string>/<string>") ([this](const crow::request &req, response &res, std::string min_id, @@ -190,6 +220,43 @@ void CrowService::run() { return res; }); + CROW_ROUTE(app, "/v2/transactions/commit") + .methods("POST"_method)([this](const request &req) { + try { + std::string body = req.body; + LOG(INFO) << "body: " << body; + + // Parse transaction JSON + rapidjson::Document doc; + doc.Parse(body.c_str()); + if (!doc.IsObject() || !doc.HasMember("id")) { + response res(400, "Invalid transaction format"); // Bad Request + res.set_header("Content-Type", "text/plain"); + return res; + } + const std::string id = doc["id"].GetString(); + const std::string value = body; + + // Set key-value pair in kv server + uint64_t seq_number = kv_client_.SetWithSeq(id, value); + crow::json::wvalue resp; + resp["id"] = id; + resp["seq"] = seq_number; + + response res(201, resp); + res.set_header("Content-Type", "application/json"); + return res; + } catch (const std::exception &e) { + LOG(ERROR) << "Exception in /v2/transactions/commit: " << e.what(); + crow::json::wvalue error_resp; + error_resp["error"] = "Internal server error"; + error_resp["message"] = e.what(); + response res(500, error_resp); + res.set_header("Content-Type", "application/json"); + return res; + } + }); + // CROW_ROUTE(app, "/v1/blocks") // ([this](const crow::request &req, response &res) { // auto values = GetAllBlocks(100); @@ -213,7 +280,8 @@ void CrowService::run() { // Retrieve blocks within a range // CROW_ROUTE(app, "/v1/blocks/<int>/<int>") - // ([this](const crow::request &req, response &res, int min_seq, int max_seq) { + // ([this](const crow::request &req, response &res, int min_seq, int max_seq) + // { // auto resp = txn_client_.GetTxn(min_seq, max_seq); // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( // uint64_t min_seq, uint64_t max_seq); @@ -243,7 +311,8 @@ void CrowService::run() { // cur_batch_str.append("{\"id\": " + std::to_string(seq)); // // // number - // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); + // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + + // "\""); // // // transactions // cur_batch_str.append(", \"transactions\": ["); @@ -326,7 +395,8 @@ void CrowService::run() { // if (first_commit_time_ == 0) { // // Get first block in the chain // auto resp = txn_client_.GetTxn(1, 1); - // // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> + // GetTxn( // // uint64_t min_seq, uint64_t max_seq); // if (!resp.ok()) { // LOG(ERROR) << "get replica state fail"; @@ -376,7 +446,8 @@ void CrowService::run() { // ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) + // ", \"maxMaliciousReplicaNum\" : " + // std::to_string(max_malicious_replica_num) + - // ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) + + // ", \"checkpointWaterMark\" : " + + // std::to_string(checkpoint_water_mark) + // ", \"transactionNum\" : " + std::to_string(num_transactions_) + // ", \"blockNum\" : " + std::to_string(*block_num_resp) + // ", \"chainAge\" : " + std::to_string(chain_age) + "}]"); @@ -389,87 +460,89 @@ void CrowService::run() { app.port(port_num_).multithreaded().run(); } - // If batch_size is 1, the function will not add the extra outer [] braces - // Otherwise, a list of lists of blocks will be returned - // std::string CrowService::GetAllBlocks(int batch_size, bool increment_txn_count, - // bool make_sublists) { - // int min_seq = 1; - // bool full_batches = true; - // - // std::string values = "[\n"; - // bool first_batch = true; - // while (full_batches) { - // std::string cur_batch_str = ""; - // if (!first_batch) cur_batch_str.append(",\n"); - // if (batch_size > 1 && make_sublists) cur_batch_str.append("["); - // first_batch = false; - // - // int max_seq = min_seq + batch_size - 1; - // auto resp = txn_client_.GetTxn(min_seq, max_seq); - // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - // uint64_t min_seq, uint64_t max_seq); - // if (!resp.ok()) { - // LOG(ERROR) << "get replica txn fail"; - // return ""; - // }; - // - // int cur_size = 0; - // bool first_batch_element = true; - // for (auto &txn : *resp) { - // BatchUserRequest request; - // KVRequest kv_request; - // cur_size++; - // if (request.ParseFromString(txn.second)) { - // if (!first_batch_element) cur_batch_str.append(","); - // - // first_batch_element = false; - // - // // id - // uint64_t seq = txn.first; - // cur_batch_str.append("{\"id\": " + std::to_string(seq)); - // - // // number - // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); - // - // // transactions - // cur_batch_str.append(", \"transactions\": ["); - // bool first_transaction = true; - // for (auto &sub_req : request.user_requests()) { - // kv_request.ParseFromString(sub_req.request().data()); - // std::string kv_request_json = ParseKVRequest(kv_request); - // - // if (!first_transaction) cur_batch_str.append(","); - // first_transaction = false; - // cur_batch_str.append(kv_request_json); - // cur_batch_str.append("\n"); - // - // if (increment_txn_count) num_transactions_++; - // } - // cur_batch_str.append("]"); // close transactions list - // - // // size - // cur_batch_str.append(", \"size\": " + - // std::to_string(request.ByteSizeLong())); - // - // // createdAt - // uint64_t createtime = request.createtime(); - // cur_batch_str.append(", \"createdAt\": \"" + - // ParseCreateTime(createtime) + "\""); - // } - // cur_batch_str.append("}\n"); - // } - // full_batches = cur_size == batch_size; - // if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); - // - // if (cur_size > 0) values.append(cur_batch_str); - // - // min_seq += batch_size; - // } - // - // values.append("\n]\n"); - // - // return values; - // } +// If batch_size is 1, the function will not add the extra outer [] braces +// Otherwise, a list of lists of blocks will be returned +// std::string CrowService::GetAllBlocks(int batch_size, bool +// increment_txn_count, +// bool make_sublists) { +// int min_seq = 1; +// bool full_batches = true; +// +// std::string values = "[\n"; +// bool first_batch = true; +// while (full_batches) { +// std::string cur_batch_str = ""; +// if (!first_batch) cur_batch_str.append(",\n"); +// if (batch_size > 1 && make_sublists) cur_batch_str.append("["); +// first_batch = false; +// +// int max_seq = min_seq + batch_size - 1; +// auto resp = txn_client_.GetTxn(min_seq, max_seq); +// absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( +// uint64_t min_seq, uint64_t max_seq); +// if (!resp.ok()) { +// LOG(ERROR) << "get replica txn fail"; +// return ""; +// }; +// +// int cur_size = 0; +// bool first_batch_element = true; +// for (auto &txn : *resp) { +// BatchUserRequest request; +// KVRequest kv_request; +// cur_size++; +// if (request.ParseFromString(txn.second)) { +// if (!first_batch_element) cur_batch_str.append(","); +// +// first_batch_element = false; +// +// // id +// uint64_t seq = txn.first; +// cur_batch_str.append("{\"id\": " + std::to_string(seq)); +// +// // number +// cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + +// "\""); +// +// // transactions +// cur_batch_str.append(", \"transactions\": ["); +// bool first_transaction = true; +// for (auto &sub_req : request.user_requests()) { +// kv_request.ParseFromString(sub_req.request().data()); +// std::string kv_request_json = ParseKVRequest(kv_request); +// +// if (!first_transaction) cur_batch_str.append(","); +// first_transaction = false; +// cur_batch_str.append(kv_request_json); +// cur_batch_str.append("\n"); +// +// if (increment_txn_count) num_transactions_++; +// } +// cur_batch_str.append("]"); // close transactions list +// +// // size +// cur_batch_str.append(", \"size\": " + +// std::to_string(request.ByteSizeLong())); +// +// // createdAt +// uint64_t createtime = request.createtime(); +// cur_batch_str.append(", \"createdAt\": \"" + +// ParseCreateTime(createtime) + "\""); +// } +// cur_batch_str.append("}\n"); +// } +// full_batches = cur_size == batch_size; +// if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); +// +// if (cur_size > 0) values.append(cur_batch_str); +// +// min_seq += batch_size; +// } +// +// values.append("\n]\n"); +// +// return values; +// } // Helper function used by the blocks endpoints to create JSON strings std::string CrowService::ParseKVRequest(const KVRequest &kv_request) { diff --git a/ecosystem/graphql/service/kv_service/proto/kv_server.proto b/ecosystem/graphql/service/kv_service/proto/kv_server.proto index 238a009e..f461f1e0 100644 --- a/ecosystem/graphql/service/kv_service/proto/kv_server.proto +++ b/ecosystem/graphql/service/kv_service/proto/kv_server.proto @@ -40,5 +40,6 @@ message KVRequest { message KVResponse { string key = 1; bytes value = 2; + uint64 seq = 5; } diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp index 9ff55d41..d8fd5aca 100644 --- a/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp +++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.cpp @@ -36,6 +36,22 @@ int ResDBKVClient::Set(const std::string &key, const std::string &data) { return SendRequest(request); } +uint64_t ResDBKVClient::SetWithSeq(const std::string &key, + const std::string &data) { + KVRequest request; + request.set_cmd(KVRequest::SET); + request.set_key(key); + request.set_value(data); + KVResponse response; + int ret = SendRequest(request, &response); + if (ret != 0) { + LOG(ERROR) << "send request fail, ret:" << ret; + return -1; + } + LOG(ERROR) << "Sequence Number: " << response.seq(); + return response.seq(); +} + std::unique_ptr<std::string> ResDBKVClient::Get(const std::string &key) { KVRequest request; request.set_cmd(KVRequest::GET); @@ -49,6 +65,21 @@ std::unique_ptr<std::string> ResDBKVClient::Get(const std::string &key) { return std::make_unique<std::string>(response.value()); } +std::unique_ptr<std::pair<std::string, uint64_t>> +ResDBKVClient::GetValueWithSeq(const std::string &key) { + KVRequest request; + request.set_cmd(KVRequest::GET); + request.set_key(key); + KVResponse response; + int ret = SendRequest(request, &response); + if (ret != 0) { + LOG(ERROR) << "send request fail, ret:" << ret; + return nullptr; + } + return std::make_unique<std::pair<std::string, uint64_t>>( + response.value(), response.seq()); +} + std::unique_ptr<std::string> ResDBKVClient::GetAllValues() { KVRequest request; request.set_cmd(KVRequest::GETALLVALUES); diff --git a/ecosystem/graphql/service/kv_service/resdb_kv_client.h b/ecosystem/graphql/service/kv_service/resdb_kv_client.h index aa9ea1ac..7370709e 100644 --- a/ecosystem/graphql/service/kv_service/resdb_kv_client.h +++ b/ecosystem/graphql/service/kv_service/resdb_kv_client.h @@ -29,7 +29,10 @@ class ResDBKVClient : public resdb::TransactionConstructor { ResDBKVClient(const resdb::ResDBConfig &config); int Set(const std::string &key, const std::string &data); + uint64_t SetWithSeq(const std::string &key, const std::string &data); std::unique_ptr<std::string> Get(const std::string &key); + std::unique_ptr<std::pair<std::string, uint64_t>> GetValueWithSeq( + const std::string &key); std::unique_ptr<std::string> GetAllValues(); std::unique_ptr<std::string> GetRange(const std::string &min_key, const std::string &max_key); diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp index 719093a4..f6e48cfb 100644 --- a/executor/kv/kv_executor.cpp +++ b/executor/kv/kv_executor.cpp @@ -49,8 +49,10 @@ std::unique_ptr<std::string> KVExecutor::ExecuteRequest( if (kv_request.cmd() == KVRequest::SET) { Set(kv_request.key(), kv_request.value()); + kv_response.set_seq(seq_); } else if (kv_request.cmd() == KVRequest::GET) { kv_response.set_value(Get(kv_request.key())); + kv_response.set_seq(seq_); } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { kv_response.set_value(GetAllValues()); } else if (kv_request.cmd() == KVRequest::GETRANGE) { diff --git a/proto/kv/kv.proto b/proto/kv/kv.proto index 750058e7..a8b45c28 100644 --- a/proto/kv/kv.proto +++ b/proto/kv/kv.proto @@ -69,6 +69,7 @@ message KVResponse { bytes value = 2; ValueInfo value_info = 3; Items items = 4; + uint64 seq = 5; bytes smart_contract_response = 10; } diff --git a/service/tools/config/interface/service.config b/service/tools/config/interface/service.config index 70bad59b..56d0a94c 100644 --- a/service/tools/config/interface/service.config +++ b/service/tools/config/interface/service.config @@ -19,7 +19,7 @@ { "replica_info": [ { - "id": 5, + "id": 1, "ip": "127.0.0.1", "port": 10005 },
