This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 893a8025ad21ca56c258dc90a75c40fd5610bf0a Author: bchou9 <[email protected]> AuthorDate: Wed Jan 7 17:05:44 2026 +0000 Update Bazel configurations and service files - Added new Bazel configuration for GraphQL ecosystem. - Updated existing Bazel configurations to include profiling options. - Modified .gitignore to exclude additional database files. - Refactored CrowService to comment out unused routes and removed txn_client_ member. - Updated service configuration files to use JSON format for replica information. - Introduced scripts for starting and stopping replicas. - Added new dependencies for JSON handling in the project. --- .bazelrc | 2 + .gitignore | 3 +- ecosystem/graphql/.bazelrc | 3 + ecosystem/graphql/WORKSPACE | 29 ++ .../graphql/service/http_server/crow_service.cpp | 578 ++++++++++----------- .../graphql/service/http_server/crow_service.h | 1 - .../service/http_server/server_config.config | 27 +- .../service/tools/config/interface/client.config | 11 +- .../service/tools/config/interface/service.config | 11 +- ecosystem/graphql/third_party/BUILD | 7 + .../graphql/third_party/{BUILD => json.BUILD} | 23 +- service/tools/config/interface/service.config | 17 +- service/tools/config/server.config | 24 +- service/tools/config/server/server.config | 18 +- service/tools/kv/server_tools/generate_config.sh | 2 +- service/tools/kv/server_tools/start_replica.sh | 61 +++ service/tools/kv/server_tools/stop_replica.sh | 31 ++ 17 files changed, 497 insertions(+), 351 deletions(-) diff --git a/.bazelrc b/.bazelrc index cc3d1af5..ecfcecae 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,4 +1,6 @@ build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 + +build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 #build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" #build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" diff --git a/.gitignore b/.gitignore index 6d1b093f..be0ae77a 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,5 @@ apache_release resdb/ 100*_db/ gmon.out -.history/ \ No newline at end of file +.history/ +*_db/ \ No newline at end of file diff --git a/ecosystem/graphql/.bazelrc b/ecosystem/graphql/.bazelrc new file mode 100644 index 00000000..3d06b30e --- /dev/null +++ b/ecosystem/graphql/.bazelrc @@ -0,0 +1,3 @@ +build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 + +build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 \ No newline at end of file diff --git a/ecosystem/graphql/WORKSPACE b/ecosystem/graphql/WORKSPACE index 8ebac612..f33c54d9 100644 --- a/ecosystem/graphql/WORKSPACE +++ b/ecosystem/graphql/WORKSPACE @@ -19,6 +19,22 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +http_archive( + name = "hedron_compile_commands", + #Replace the commit hash (4f28899228fb3ad0126897876f147ca15026151e) with the latest commit hash from the repo + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/4f28899228fb3ad0126897876f147ca15026151e.tar.gz", + strip_prefix = "bazel-compile-commands-extractor-4f28899228fb3ad0126897876f147ca15026151e", +) +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") +hedron_compile_commands_setup() +load("@hedron_compile_commands//:workspace_setup_transitive.bzl", "hedron_compile_commands_setup_transitive") +hedron_compile_commands_setup_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive_transitive() + + http_archive( name = "rules_foreign_cc", sha256 = "69023642d5781c68911beda769f91fcbc8ca48711db935a75da7f6536b65047f", @@ -238,6 +254,19 @@ http_archive( ], ) +http_archive( + name = "nlohmann_json", + build_file = "//third_party:json.BUILD", + sha256 = "95651d7d1fcf2e5c3163c3d37df6d6b3e9e5027299e6bd050d157322ceda9ac9", + strip_prefix = "json-3.11.2", + url = "https://github.com/nlohmann/json/archive/refs/tags/v3.11.2.zip", +) + +bind( + name = "json", + actual = "@nlohmann_json//:json", +) + load("@com_resdb_nexres//:repositories.bzl", "nexres_repositories") nexres_repositories() diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp b/ecosystem/graphql/service/http_server/crow_service.cpp index e7054634..b45cd273 100644 --- a/ecosystem/graphql/service/http_server/crow_service.cpp +++ b/ecosystem/graphql/service/http_server/crow_service.cpp @@ -55,9 +55,7 @@ CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config, server_config_(server_config), port_num_(port_num), kv_client_(client_config_), - state_client_(client_config_), - txn_client_(server_config_) { - GetAllBlocks(100, true, false); + state_client_(client_config_) { } void CrowService::run() { @@ -67,42 +65,42 @@ void CrowService::run() { std::mutex mtx; // Get all values - CROW_ROUTE(app, "/v1/transactions") - ([this](const crow::request &req, response &res) { - uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) { - res.code = 503; - res.set_header("Content-Type", "text/plain"); - res.end( - "Get all transactions functionality on cooldown (" + - std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) + - " ms left)"); - } else { - last_db_scan_time = cur_time; - - auto values = kv_client_.GetAllValues(); - if (values != nullptr) { - // LOG(INFO) << "client getallvalues value = " << values->c_str(); - - // Send updated blocks list to websocket - if (users.size() > 0) { - for (auto u : users) u->send_text("Update blocks"); - } - - num_transactions_++; - - res.set_header("Content-Type", "application/json"); - res.end(std::string(values->c_str())); - } else { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("getallvalues fail"); - } - } - }); + // CROW_ROUTE(app, "/v1/transactions") + // ([this](const crow::request &req, response &res) { + // uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + + // if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) { + // res.code = 503; + // res.set_header("Content-Type", "text/plain"); + // res.end( + // "Get all transactions functionality on cooldown (" + + // std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) + + // " ms left)"); + // } else { + // last_db_scan_time = cur_time; + + // auto values = kv_client_.GetAllValues(); + // if (values != nullptr) { + // // LOG(INFO) << "client getallvalues value = " << values->c_str(); + + // // Send updated blocks list to websocket + // if (users.size() > 0) { + // for (auto u : users) u->send_text("Update blocks"); + // } + + // num_transactions_++; + + // res.set_header("Content-Type", "application/json"); + // res.end(std::string(values->c_str())); + // } else { + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("getallvalues fail"); + // } + // } + // }); // Get value of specific id CROW_ROUTE(app, "/v1/transactions/<string>") @@ -192,91 +190,91 @@ void CrowService::run() { return res; }); - CROW_ROUTE(app, "/v1/blocks") - ([this](const crow::request &req, response &res) { - auto values = GetAllBlocks(100); - res.set_header("Content-Type", "application/json"); - res.end(values); - }); + // CROW_ROUTE(app, "/v1/blocks") + // ([this](const crow::request &req, response &res) { + // auto values = GetAllBlocks(100); + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); // Retrieve blocks in batches of size of the int parameter - CROW_ROUTE(app, "/v1/blocks/<int>") - ([this](const crow::request &req, response &res, int batch_size) { - auto values = GetAllBlocks(batch_size, false, true); - if (values == "") { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("get replica state fail"); - exit(1); - }; - res.set_header("Content-Type", "application/json"); - res.end(values); - }); + // CROW_ROUTE(app, "/v1/blocks/<int>") + // ([this](const crow::request &req, response &res, int batch_size) { + // auto values = GetAllBlocks(batch_size, false, true); + // if (values == "") { + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("get replica state fail"); + // exit(1); + // }; + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); // Retrieve blocks within a range - CROW_ROUTE(app, "/v1/blocks/<int>/<int>") - ([this](const crow::request &req, response &res, int min_seq, int max_seq) { - auto resp = txn_client_.GetTxn(min_seq, max_seq); - absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica state fail"; - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("get replica state fail"); - exit(1); - } - - std::string values = "[\n"; - bool first_iteration = true; - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - - std::string cur_batch_str = ""; - if (request.ParseFromString(txn.second)) { - LOG(INFO) << request.DebugString(); - - if (!first_iteration) cur_batch_str.append(","); - first_iteration = false; - - // id - uint64_t seq = txn.first; - cur_batch_str.append("{\"id\": " + std::to_string(seq)); - - // number - cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); - - // transactions - cur_batch_str.append(", \"transactions\": ["); - bool first_transaction = true; - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - - if (!first_transaction) cur_batch_str.append(","); - first_transaction = false; - cur_batch_str.append(kv_request_json); - cur_batch_str.append("\n"); - } - cur_batch_str.append("]"); // close transactions list - - // size - cur_batch_str.append(", \"size\": " + - std::to_string(request.ByteSizeLong())); - - // createdAt - uint64_t createtime = request.createtime(); - cur_batch_str.append(", \"createdAt\": \"" + - ParseCreateTime(createtime) + "\""); - } - cur_batch_str.append("}\n"); - values.append(cur_batch_str); - } - values.append("]\n"); - res.set_header("Content-Type", "application/json"); - res.end(values); - }); + // CROW_ROUTE(app, "/v1/blocks/<int>/<int>") + // ([this](const crow::request &req, response &res, int min_seq, int max_seq) { + // auto resp = txn_client_.GetTxn(min_seq, max_seq); + // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica state fail"; + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("get replica state fail"); + // exit(1); + // } + // + // std::string values = "[\n"; + // bool first_iteration = true; + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // + // std::string cur_batch_str = ""; + // if (request.ParseFromString(txn.second)) { + // LOG(INFO) << request.DebugString(); + // + // if (!first_iteration) cur_batch_str.append(","); + // first_iteration = false; + // + // // id + // uint64_t seq = txn.first; + // cur_batch_str.append("{\"id\": " + std::to_string(seq)); + // + // // number + // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); + // + // // transactions + // cur_batch_str.append(", \"transactions\": ["); + // bool first_transaction = true; + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // + // if (!first_transaction) cur_batch_str.append(","); + // first_transaction = false; + // cur_batch_str.append(kv_request_json); + // cur_batch_str.append("\n"); + // } + // cur_batch_str.append("]"); // close transactions list + // + // // size + // cur_batch_str.append(", \"size\": " + + // std::to_string(request.ByteSizeLong())); + // + // // createdAt + // uint64_t createtime = request.createtime(); + // cur_batch_str.append(", \"createdAt\": \"" + + // ParseCreateTime(createtime) + "\""); + // } + // cur_batch_str.append("}\n"); + // values.append(cur_batch_str); + // } + // values.append("]\n"); + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); CROW_ROUTE(app, "/blockupdatelistener") .websocket() @@ -297,181 +295,181 @@ void CrowService::run() { }); // For metadata table on the Explorer - CROW_ROUTE(app, "/populatetable") - ([this](const crow::request &req, response &res) { - absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState(); - if (!state_or.ok()) { - LOG(ERROR) << "get state fail"; - res.set_header("Content-Type", "application/json"); - res.end(""); - return; - } - - ResConfigData config_data = (*state_or).replica_config(); - ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), - CertificateInfo()); - - uint32_t replica_num = server_config.GetReplicaNum(); - uint32_t worker_num = server_config.GetWorkerNum(); - uint32_t client_batch_num = server_config.ClientBatchNum(); - uint32_t max_process_txn = server_config.GetMaxProcessTxn(); - uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS(); - uint32_t input_worker_num = server_config.GetInputWorkerNum(); - uint32_t output_worker_num = server_config.GetOutputWorkerNum(); - int client_timeout_ms = server_config.GetClientTimeoutMs(); - int min_data_receive_num = server_config.GetMinDataReceiveNum(); - size_t max_malicious_replica_num = - server_config.GetMaxMaliciousReplicaNum(); - int checkpoint_water_mark = server_config.GetCheckPointWaterMark(); - - // Don't read the ledger if first commit time is known - if (first_commit_time_ == 0) { - // Get first block in the chain - auto resp = txn_client_.GetTxn(1, 1); - // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - // uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica state fail"; - res.end("get replica state fail"); - }; - - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - if (request.ParseFromString(txn.second)) { - // transactions - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - } - - // see resilientdb/common/utils/utils.cpp - first_commit_time_ = request.createtime() / 1000000; - } - } - } - - uint64_t epoch_time = - std::chrono::duration_cast<std::chrono::seconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - uint64_t chain_age = - first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_; - - auto block_num_resp = txn_client_.GetBlockNumbers(); - if (!block_num_resp.ok()) { - LOG(ERROR) << "get number fail"; - exit(1); - } - - std::string values = ""; - values.append( - "[{ \"replicaNum\": " + std::to_string(replica_num) + - ", \"workerNum\" : " + std::to_string(worker_num) + - ", \"clientBatchNum\" : " + std::to_string(client_batch_num) + - ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) + - ", \"clientBatchWaitTime\" : " + - std::to_string(client_batch_wait_time) + - ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) + - ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) + - ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) + - ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) + - ", \"maxMaliciousReplicaNum\" : " + - std::to_string(max_malicious_replica_num) + - ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) + - ", \"transactionNum\" : " + std::to_string(num_transactions_) + - ", \"blockNum\" : " + std::to_string(*block_num_resp) + - ", \"chainAge\" : " + std::to_string(chain_age) + "}]"); - LOG(INFO) << std::string(values.c_str()); - res.set_header("Content-Type", "application/json"); - res.end(std::string(values.c_str())); - }); + // CROW_ROUTE(app, "/populatetable") + // ([this](const crow::request &req, response &res) { + // absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState(); + // if (!state_or.ok()) { + // LOG(ERROR) << "get state fail"; + // res.set_header("Content-Type", "application/json"); + // res.end(""); + // return; + // } + // + // ResConfigData config_data = (*state_or).replica_config(); + // ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), + // CertificateInfo()); + // + // uint32_t replica_num = server_config.GetReplicaNum(); + // uint32_t worker_num = server_config.GetWorkerNum(); + // uint32_t client_batch_num = server_config.ClientBatchNum(); + // uint32_t max_process_txn = server_config.GetMaxProcessTxn(); + // uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS(); + // uint32_t input_worker_num = server_config.GetInputWorkerNum(); + // uint32_t output_worker_num = server_config.GetOutputWorkerNum(); + // int client_timeout_ms = server_config.GetClientTimeoutMs(); + // int min_data_receive_num = server_config.GetMinDataReceiveNum(); + // size_t max_malicious_replica_num = + // server_config.GetMaxMaliciousReplicaNum(); + // int checkpoint_water_mark = server_config.GetCheckPointWaterMark(); + // + // // Don't read the ledger if first commit time is known + // if (first_commit_time_ == 0) { + // // Get first block in the chain + // auto resp = txn_client_.GetTxn(1, 1); + // // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica state fail"; + // res.end("get replica state fail"); + // }; + // + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // if (request.ParseFromString(txn.second)) { + // // transactions + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // } + // + // // see resilientdb/common/utils/utils.cpp + // first_commit_time_ = request.createtime() / 1000000; + // } + // } + // } + // + // uint64_t epoch_time = + // std::chrono::duration_cast<std::chrono::seconds>( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + // uint64_t chain_age = + // first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_; + // + // auto block_num_resp = txn_client_.GetBlockNumbers(); + // if (!block_num_resp.ok()) { + // LOG(ERROR) << "get number fail"; + // exit(1); + // } + // + // std::string values = ""; + // values.append( + // "[{ \"replicaNum\": " + std::to_string(replica_num) + + // ", \"workerNum\" : " + std::to_string(worker_num) + + // ", \"clientBatchNum\" : " + std::to_string(client_batch_num) + + // ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) + + // ", \"clientBatchWaitTime\" : " + + // std::to_string(client_batch_wait_time) + + // ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) + + // ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) + + // ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) + + // ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) + + // ", \"maxMaliciousReplicaNum\" : " + + // std::to_string(max_malicious_replica_num) + + // ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) + + // ", \"transactionNum\" : " + std::to_string(num_transactions_) + + // ", \"blockNum\" : " + std::to_string(*block_num_resp) + + // ", \"chainAge\" : " + std::to_string(chain_age) + "}]"); + // LOG(INFO) << std::string(values.c_str()); + // res.set_header("Content-Type", "application/json"); + // res.end(std::string(values.c_str())); + // }); // Run the Crow app app.port(port_num_).multithreaded().run(); } -// If batch_size is 1, the function will not add the extra outer [] braces -// Otherwise, a list of lists of blocks will be returned -std::string CrowService::GetAllBlocks(int batch_size, bool increment_txn_count, - bool make_sublists) { - int min_seq = 1; - bool full_batches = true; - - std::string values = "[\n"; - bool first_batch = true; - while (full_batches) { - std::string cur_batch_str = ""; - if (!first_batch) cur_batch_str.append(",\n"); - if (batch_size > 1 && make_sublists) cur_batch_str.append("["); - first_batch = false; - - int max_seq = min_seq + batch_size - 1; - auto resp = txn_client_.GetTxn(min_seq, max_seq); - absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica txn fail"; - return ""; - }; - - int cur_size = 0; - bool first_batch_element = true; - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - cur_size++; - if (request.ParseFromString(txn.second)) { - if (!first_batch_element) cur_batch_str.append(","); - - first_batch_element = false; - - // id - uint64_t seq = txn.first; - cur_batch_str.append("{\"id\": " + std::to_string(seq)); - - // number - cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); - - // transactions - cur_batch_str.append(", \"transactions\": ["); - bool first_transaction = true; - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - - if (!first_transaction) cur_batch_str.append(","); - first_transaction = false; - cur_batch_str.append(kv_request_json); - cur_batch_str.append("\n"); - - if (increment_txn_count) num_transactions_++; - } - cur_batch_str.append("]"); // close transactions list - - // size - cur_batch_str.append(", \"size\": " + - std::to_string(request.ByteSizeLong())); - - // createdAt - uint64_t createtime = request.createtime(); - cur_batch_str.append(", \"createdAt\": \"" + - ParseCreateTime(createtime) + "\""); - } - cur_batch_str.append("}\n"); - } - full_batches = cur_size == batch_size; - if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); - - if (cur_size > 0) values.append(cur_batch_str); - - min_seq += batch_size; - } - - values.append("\n]\n"); - - return values; -} + // If batch_size is 1, the function will not add the extra outer [] braces + // Otherwise, a list of lists of blocks will be returned + // std::string CrowService::GetAllBlocks(int batch_size, bool increment_txn_count, + // bool make_sublists) { + // int min_seq = 1; + // bool full_batches = true; + // + // std::string values = "[\n"; + // bool first_batch = true; + // while (full_batches) { + // std::string cur_batch_str = ""; + // if (!first_batch) cur_batch_str.append(",\n"); + // if (batch_size > 1 && make_sublists) cur_batch_str.append("["); + // first_batch = false; + // + // int max_seq = min_seq + batch_size - 1; + // auto resp = txn_client_.GetTxn(min_seq, max_seq); + // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica txn fail"; + // return ""; + // }; + // + // int cur_size = 0; + // bool first_batch_element = true; + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // cur_size++; + // if (request.ParseFromString(txn.second)) { + // if (!first_batch_element) cur_batch_str.append(","); + // + // first_batch_element = false; + // + // // id + // uint64_t seq = txn.first; + // cur_batch_str.append("{\"id\": " + std::to_string(seq)); + // + // // number + // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); + // + // // transactions + // cur_batch_str.append(", \"transactions\": ["); + // bool first_transaction = true; + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // + // if (!first_transaction) cur_batch_str.append(","); + // first_transaction = false; + // cur_batch_str.append(kv_request_json); + // cur_batch_str.append("\n"); + // + // if (increment_txn_count) num_transactions_++; + // } + // cur_batch_str.append("]"); // close transactions list + // + // // size + // cur_batch_str.append(", \"size\": " + + // std::to_string(request.ByteSizeLong())); + // + // // createdAt + // uint64_t createtime = request.createtime(); + // cur_batch_str.append(", \"createdAt\": \"" + + // ParseCreateTime(createtime) + "\""); + // } + // cur_batch_str.append("}\n"); + // } + // full_batches = cur_size == batch_size; + // if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); + // + // if (cur_size > 0) values.append(cur_batch_str); + // + // min_seq += batch_size; + // } + // + // values.append("\n]\n"); + // + // return values; + // } // Helper function used by the blocks endpoints to create JSON strings std::string CrowService::ParseKVRequest(const KVRequest &kv_request) { diff --git a/ecosystem/graphql/service/http_server/crow_service.h b/ecosystem/graphql/service/http_server/crow_service.h index 162968e2..b581cb7e 100644 --- a/ecosystem/graphql/service/http_server/crow_service.h +++ b/ecosystem/graphql/service/http_server/crow_service.h @@ -48,7 +48,6 @@ class CrowService { uint16_t port_num_; ResDBKVClient kv_client_; resdb::ResDBStateAccessor state_client_; - resdb::ResDBTxnAccessor txn_client_; std::unordered_set<crow::websocket::connection *> users; std::atomic_uint16_t num_transactions_ = 0; std::atomic_uint64_t first_commit_time_ = 0; diff --git a/ecosystem/graphql/service/http_server/server_config.config b/ecosystem/graphql/service/http_server/server_config.config index 58551cc5..c1054dda 100644 --- a/ecosystem/graphql/service/http_server/server_config.config +++ b/ecosystem/graphql/service/http_server/server_config.config @@ -15,8 +15,25 @@ // specific language governing permissions and limitations // under the License. - -1 127.0.0.1 10001 -2 127.0.0.1 10002 -3 127.0.0.1 10003 -4 127.0.0.1 10004 +{ + replica_info : { + id:1, + ip:"127.0.0.1", + port: 10001, + }, + replica_info : { + id:2, + ip:"127.0.0.1", + port: 10002, + }, + replica_info : { + id:3, + ip:"127.0.0.1", + port: 10003, + }, + replica_info : { + id:4, + ip:"127.0.0.1", + port: 10004, + } +} \ No newline at end of file diff --git a/ecosystem/graphql/service/tools/config/interface/client.config b/ecosystem/graphql/service/tools/config/interface/client.config index 1334de2b..6d338f8b 100644 --- a/ecosystem/graphql/service/tools/config/interface/client.config +++ b/ecosystem/graphql/service/tools/config/interface/client.config @@ -15,4 +15,13 @@ // specific language governing permissions and limitations // under the License. -5 127.0.0.1 10005 +{ + "replica_info": [ + { + "id": 5, + "ip": "127.0.0.1", + "port": 10005 + }, + ] +} + diff --git a/ecosystem/graphql/service/tools/config/interface/service.config b/ecosystem/graphql/service/tools/config/interface/service.config index 3d1f8e9c..6027aa7f 100644 --- a/ecosystem/graphql/service/tools/config/interface/service.config +++ b/ecosystem/graphql/service/tools/config/interface/service.config @@ -15,6 +15,15 @@ // specific language governing permissions and limitations // under the License. -5 127.0.0.1 10005 +{ + "replica_info": [ + { + "id": 5, + "ip": "127.0.0.1", + "port": 10005 + }, + ] +} + diff --git a/ecosystem/graphql/third_party/BUILD b/ecosystem/graphql/third_party/BUILD index 25da0d1f..f10e8899 100644 --- a/ecosystem/graphql/third_party/BUILD +++ b/ecosystem/graphql/third_party/BUILD @@ -35,3 +35,10 @@ cc_library( "@rapidjson", ], ) + +cc_library( + name = "json", + deps = [ + "@nlohmann_json//:json", + ], +) diff --git a/ecosystem/graphql/third_party/BUILD b/ecosystem/graphql/third_party/json.BUILD similarity index 78% copy from ecosystem/graphql/third_party/BUILD copy to ecosystem/graphql/third_party/json.BUILD index 25da0d1f..f0fd8e0b 100644 --- a/ecosystem/graphql/third_party/BUILD +++ b/ecosystem/graphql/third_party/json.BUILD @@ -16,22 +16,15 @@ # specific language governing permissions and limitations # under the License. # -# - -package(default_visibility = ["//visibility:public"]) -load("@rules_foreign_cc//foreign_cc:defs.bzl", "configure_make", "make") +licenses(["notice"]) +exports_files(["LICENSE"]) -cc_library( - name = "crow", - deps = [ - "@com_crowcpp_crow//:crow", - ], -) +package(default_visibility = ["//visibility:public"]) cc_library( - name = "rapidjson", - deps = [ - "@rapidjson", - ], -) + name = "json", + hdrs = glob(["single_include/nlohmann/*.hpp"]), + includes = ["single_include"], + visibility = ["//visibility:public"], +) \ No newline at end of file diff --git a/service/tools/config/interface/service.config b/service/tools/config/interface/service.config index a437dbc7..70bad59b 100644 --- a/service/tools/config/interface/service.config +++ b/service/tools/config/interface/service.config @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -16,13 +17,11 @@ // under the License. { -"replica_info":[ -{ -"id":5, -"ip":"127.0.0.1", -"port":17005 -} -] + "replica_info": [ + { + "id": 5, + "ip": "127.0.0.1", + "port": 10005 + }, + ] } - - diff --git a/service/tools/config/server.config b/service/tools/config/server.config index 756dc00f..a68d1cec 100644 --- a/service/tools/config/server.config +++ b/service/tools/config/server.config @@ -1,21 +1,5 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -1 127.0.0.1 20001 -2 127.0.0.1 20002 -3 127.0.0.1 20003 -4 127.0.0.1 20004 +1 127.0.0.1 10001 +2 127.0.0.1 10002 +3 127.0.0.1 10003 +4 127.0.0.1 10004 diff --git a/service/tools/config/server/server.config b/service/tools/config/server/server.config index 9aa3e6fd..978deff8 100644 --- a/service/tools/config/server/server.config +++ b/service/tools/config/server/server.config @@ -46,10 +46,14 @@ enable_block_cache: true, block_cache_capacity: 100 }, - require_txn_validation:true, - enable_viewchange:false, - enable_resview:true, - enable_faulty_switch:false -} - - + clientBatchNum: 100, + enable_viewchange: true, + enable_resview: true, + enable_faulty_switch: true, + recovery_enabled: true, + max_client_complaint_num: 10, + max_process_txn: 2048, + worker_num: 2, + input_worker_num: 1, + output_worker_num: 10 +} \ No newline at end of file diff --git a/service/tools/kv/server_tools/generate_config.sh b/service/tools/kv/server_tools/generate_config.sh index 5f494434..14641d70 100755 --- a/service/tools/kv/server_tools/generate_config.sh +++ b/service/tools/kv/server_tools/generate_config.sh @@ -28,7 +28,7 @@ iplist=( WORKSPACE=$PWD CERT_PATH=$PWD/service/tools/data/cert/ CONFIG_PATH=$PWD/service/tools/config/ -PORT_BASE=20000 +PORT_BASE=10000 CLIENT_NUM=1 ./service/tools/config/generate_config.sh ${WORKSPACE} ${CERT_PATH} ${CERT_PATH} ${CONFIG_PATH} ${CERT_PATH} ${CLIENT_NUM} ${PORT_BASE} ${iplist[@]} diff --git a/service/tools/kv/server_tools/start_replica.sh b/service/tools/kv/server_tools/start_replica.sh new file mode 100755 index 00000000..a2af1615 --- /dev/null +++ b/service/tools/kv/server_tools/start_replica.sh @@ -0,0 +1,61 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Usage: ./start_replica.sh <replica_number> +# Example: ./start_replica.sh 1 (starts node 1 on port 8090) + +if [ -z "$1" ]; then + echo "Usage: $0 <replica_number>" + echo "Example: $0 1 (starts replica 1)" + echo " $0 2 (starts replica 2)" + echo " $0 3 (starts replica 3)" + echo " $0 4 (starts replica 4)" + echo " $0 5 (starts replica 5)" + exit 1 +fi + +REPLICA_NUM=$1 + +# Validate replica number +if ! [[ "$REPLICA_NUM" =~ ^[1-5]$ ]]; then + echo "Error: Replica number must be between 1 and 5" + exit 1 +fi + +SERVER_PATH=./bazel-bin/service/kv/kv_service +SERVER_CONFIG=service/tools/config/server/server.config +WORK_PATH=$PWD +CERT_PATH=${WORK_PATH}/service/tools/data/cert/ +PORT=$((8089 + REPLICA_NUM)) +LOG_FILE="server$((REPLICA_NUM - 1)).log" + +echo "Starting replica $REPLICA_NUM on port $PORT..." +echo "Log file: $LOG_FILE" + +# Build if binary doesn't exist +if [ ! -f "$SERVER_PATH" ]; then + echo "Building kv_service..." + bazel build //service/kv:kv_service --define enable_leveldb=True +fi + +# Start the replica +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node${REPLICA_NUM}.key.pri $CERT_PATH/cert_${REPLICA_NUM}.cert $PORT > $LOG_FILE & + +echo "Replica $REPLICA_NUM started with PID: $!" diff --git a/service/tools/kv/server_tools/stop_replica.sh b/service/tools/kv/server_tools/stop_replica.sh new file mode 100644 index 00000000..67cd3cce --- /dev/null +++ b/service/tools/kv/server_tools/stop_replica.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Kill a specific replica or all kv_service processes +# Usage: ./stop_replica.sh [replica_number] +# Example: ./stop_replica.sh 1 (kills only replica 1) +# ./stop_replica.sh (kills all replicas) +# + +if [ -z "$1" ]; then + echo "Killing all kv_service processes..." + killall -9 kv_service + echo "Done" +else + REPLICA_NUM=$1 + if ! [[ "$REPLICA_NUM" =~ ^[1-5]$ ]]; then + echo "Error: Replica number must be between 1 and 5" + exit 1 + fi + + PORT=$((8089 + REPLICA_NUM)) + echo "Killing kv_service on port $PORT (replica $REPLICA_NUM)..." + + # Find and kill process on specific port + PID=$(lsof -i :$PORT -t 2>/dev/null) + if [ -z "$PID" ]; then + echo "No process found on port $PORT" + else + kill -9 $PID + echo "Killed PID: $PID" + fi +fi
