This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch development in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit dc2e4a83733a72240b268bb8c38bc59c4902f92f Author: harish876 <[email protected]> AuthorDate: Mon Jan 19 22:11:54 2026 +0000 Add Bazel configuration for C++17 and optimization; refactor JSONScalar implementation; update service configuration for replicas --- ecosystem/graphql/.bazelrc | 5 + ecosystem/graphql/WORKSPACE | 28 + ecosystem/graphql/app.py | 15 +- .../graphql/service/http_server/crow_service.cpp | 1046 ++++++++++---------- .../graphql/service/http_server/crow_service.h | 1 - .../service/http_server/server_config.config | 26 +- .../service/tools/config/interface/client.config | 11 +- .../service/tools/config/interface/service.config | 10 +- ecosystem/graphql/third_party/BUILD | 7 + .../graphql/third_party/{BUILD => json.BUILD} | 23 +- 10 files changed, 617 insertions(+), 555 deletions(-) diff --git a/ecosystem/graphql/.bazelrc b/ecosystem/graphql/.bazelrc new file mode 100644 index 00000000..dceba3d5 --- /dev/null +++ b/ecosystem/graphql/.bazelrc @@ -0,0 +1,5 @@ +build --cxxopt='-std=c++17' --copt=-O3 --jobs=8 +build:profile --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 + +#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" +#build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" \ No newline at end of file diff --git a/ecosystem/graphql/WORKSPACE b/ecosystem/graphql/WORKSPACE index 8ebac612..cd02dec7 100644 --- a/ecosystem/graphql/WORKSPACE +++ b/ecosystem/graphql/WORKSPACE @@ -19,6 +19,21 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +http_archive( + name = "hedron_compile_commands", + #Replace the commit hash (4f28899228fb3ad0126897876f147ca15026151e) with the latest commit hash from the repo + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/4f28899228fb3ad0126897876f147ca15026151e.tar.gz", + strip_prefix = "bazel-compile-commands-extractor-4f28899228fb3ad0126897876f147ca15026151e", +) +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") +hedron_compile_commands_setup() +load("@hedron_compile_commands//:workspace_setup_transitive.bzl", "hedron_compile_commands_setup_transitive") +hedron_compile_commands_setup_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive_transitive() + http_archive( name = "rules_foreign_cc", sha256 = "69023642d5781c68911beda769f91fcbc8ca48711db935a75da7f6536b65047f", @@ -238,6 +253,19 @@ http_archive( ], ) +http_archive( + name = "nlohmann_json", + build_file = "//third_party:json.BUILD", + sha256 = "95651d7d1fcf2e5c3163c3d37df6d6b3e9e5027299e6bd050d157322ceda9ac9", + strip_prefix = "json-3.11.2", + url = "https://github.com/nlohmann/json/archive/refs/tags/v3.11.2.zip", +) + +bind( + name = "json", + actual = "@nlohmann_json//:json", +) + load("@com_resdb_nexres//:repositories.bzl", "nexres_repositories") nexres_repositories() diff --git a/ecosystem/graphql/app.py b/ecosystem/graphql/app.py index c5e6900b..331f08ce 100644 --- a/ecosystem/graphql/app.py +++ b/ecosystem/graphql/app.py @@ -40,15 +40,12 @@ CORS(app) # This will enable CORS for all routes from strawberry.flask.views import GraphQLView [email protected](description="Custom JSON scalar") -class JSONScalar: - @staticmethod - def serialize(value: Any) -> Any: - return value # Directly return the JSON object - - @staticmethod - def parse_value(value: Any) -> Any: - return value # Accept JSON as is +JSONScalar = strawberry.scalar( + typing.NewType("JSONScalar", typing.Any), + serialize=lambda v: v, + parse_value=lambda v: v, + description="Custom JSON scalar" +) @strawberry.type class RetrieveTransaction: diff --git a/ecosystem/graphql/service/http_server/crow_service.cpp b/ecosystem/graphql/service/http_server/crow_service.cpp index e7054634..cf97d30c 100644 --- a/ecosystem/graphql/service/http_server/crow_service.cpp +++ b/ecosystem/graphql/service/http_server/crow_service.cpp @@ -17,527 +17,525 @@ * under the License. */ -#include "service/http_server/crow_service.h" - -#include <glog/logging.h> -#include <rapidjson/document.h> -#include <rapidjson/stringbuffer.h> -#include <rapidjson/writer.h> -#include <stdint.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <unistd.h> - -#include <chrono> -#include <ctime> -#include <fstream> -#include <memory> -#include <mutex> -#include <string> -#include <thread> -#include <unordered_set> - -using crow::request; -using crow::response; -using resdb::BatchUserRequest; -using resdb::CertificateInfo; -using resdb::KeyInfo; -using resdb::ReplicaInfo; -using resdb::ReplicaState; -using resdb::ResConfigData; -using resdb::ResDBConfig; - -namespace sdk { - -CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config, - uint16_t port_num) - : client_config_(client_config), - server_config_(server_config), - port_num_(port_num), - kv_client_(client_config_), - state_client_(client_config_), - txn_client_(server_config_) { - GetAllBlocks(100, true, false); -} - -void CrowService::run() { - crow::SimpleApp app; - - // For adding and removing websocket connections - std::mutex mtx; - - // Get all values - CROW_ROUTE(app, "/v1/transactions") - ([this](const crow::request &req, response &res) { - uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) { - res.code = 503; - res.set_header("Content-Type", "text/plain"); - res.end( - "Get all transactions functionality on cooldown (" + - std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) + - " ms left)"); - } else { - last_db_scan_time = cur_time; - - auto values = kv_client_.GetAllValues(); - if (values != nullptr) { - // LOG(INFO) << "client getallvalues value = " << values->c_str(); - - // Send updated blocks list to websocket - if (users.size() > 0) { - for (auto u : users) u->send_text("Update blocks"); - } - - num_transactions_++; - - res.set_header("Content-Type", "application/json"); - res.end(std::string(values->c_str())); - } else { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("getallvalues fail"); - } - } - }); - - // Get value of specific id - CROW_ROUTE(app, "/v1/transactions/<string>") - ([this](const crow::request &req, response &res, std::string id) { - auto value = kv_client_.Get(id); - if (value != nullptr) { - LOG(INFO) << "client get value = " << value->c_str(); - - // Send updated blocks list to websocket - if (users.size() > 0) { - for (auto u : users) u->send_text("Update blocks"); - } - - num_transactions_++; - - res.set_header("Content-Type", "application/json"); - res.end(std::string(value->c_str())); - } else { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("get value fail"); - } - }); - - // Get values based on key range - CROW_ROUTE(app, "/v1/transactions/<string>/<string>") - ([this](const crow::request &req, response &res, std::string min_id, - std::string max_id) { - auto value = kv_client_.GetRange(min_id, max_id); - if (value != nullptr) { - LOG(INFO) << "client getrange value = " << value->c_str(); - - // Send updated blocks list to websocket - if (users.size() > 0) { - for (auto u : users) u->send_text("Update blocks"); - } - - num_transactions_++; - - res.set_header("Content-Type", "application/json"); - res.end(std::string(value->c_str())); - } else { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("getrange fail"); - } - }); - - // Commit a key-value pair, extracting the id parameter from the JSON - // object and setting the value as the entire JSON object - CROW_ROUTE(app, "/v1/transactions/commit") - .methods("POST"_method)([this](const request &req) { - std::string body = req.body; - LOG(INFO) << "body: " << body; - - // Parse transaction JSON - rapidjson::Document doc; - doc.Parse(body.c_str()); - if (!doc.IsObject() || !doc.HasMember("id")) { - response res(400, "Invalid transaction format"); // Bad Request - res.set_header("Content-Type", "text/plain"); - return res; - } - const std::string id = doc["id"].GetString(); - const std::string value = body; - - // Set key-value pair in kv server - int retval = kv_client_.Set(id, value); - - if (retval != 0) { - LOG(ERROR) << "Error when trying to commit id " << id; - response res(500, "id: " + id); - res.set_header("Content-Type", "text/plain"); - return res; - } - LOG(INFO) << "Set " << id << " to " << value; - - // Send updated blocks list to websocket - if (users.size() > 0) { - for (auto u : users) u->send_text("Update blocks"); - } - - num_transactions_++; - - response res(201, "id: " + id); // Created status code - res.set_header("Content-Type", "text/plain"); - return res; - }); - - CROW_ROUTE(app, "/v1/blocks") - ([this](const crow::request &req, response &res) { - auto values = GetAllBlocks(100); - res.set_header("Content-Type", "application/json"); - res.end(values); - }); - - // Retrieve blocks in batches of size of the int parameter - CROW_ROUTE(app, "/v1/blocks/<int>") - ([this](const crow::request &req, response &res, int batch_size) { - auto values = GetAllBlocks(batch_size, false, true); - if (values == "") { - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("get replica state fail"); - exit(1); - }; - res.set_header("Content-Type", "application/json"); - res.end(values); - }); - - // Retrieve blocks within a range - CROW_ROUTE(app, "/v1/blocks/<int>/<int>") - ([this](const crow::request &req, response &res, int min_seq, int max_seq) { - auto resp = txn_client_.GetTxn(min_seq, max_seq); - absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica state fail"; - res.code = 500; - res.set_header("Content-Type", "text/plain"); - res.end("get replica state fail"); - exit(1); - } - - std::string values = "[\n"; - bool first_iteration = true; - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - - std::string cur_batch_str = ""; - if (request.ParseFromString(txn.second)) { - LOG(INFO) << request.DebugString(); - - if (!first_iteration) cur_batch_str.append(","); - first_iteration = false; - - // id - uint64_t seq = txn.first; - cur_batch_str.append("{\"id\": " + std::to_string(seq)); - - // number - cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); - - // transactions - cur_batch_str.append(", \"transactions\": ["); - bool first_transaction = true; - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - - if (!first_transaction) cur_batch_str.append(","); - first_transaction = false; - cur_batch_str.append(kv_request_json); - cur_batch_str.append("\n"); - } - cur_batch_str.append("]"); // close transactions list - - // size - cur_batch_str.append(", \"size\": " + - std::to_string(request.ByteSizeLong())); - - // createdAt - uint64_t createtime = request.createtime(); - cur_batch_str.append(", \"createdAt\": \"" + - ParseCreateTime(createtime) + "\""); - } - cur_batch_str.append("}\n"); - values.append(cur_batch_str); - } - values.append("]\n"); - res.set_header("Content-Type", "application/json"); - res.end(values); - }); - - CROW_ROUTE(app, "/blockupdatelistener") - .websocket() - .onopen([&](crow::websocket::connection &conn) { - LOG(INFO) << "Opened websocket"; - std::lock_guard<std::mutex> _(mtx); - users.insert(&conn); - }) - .onclose( - [&](crow::websocket::connection &conn, const std::string &reason) { - LOG(INFO) << "Closed websocket"; - std::lock_guard<std::mutex> _(mtx); - users.erase(&conn); - }) - .onmessage([&](crow::websocket::connection & /*conn*/, - const std::string &data, bool is_binary) { - // do nothing - }); - - // For metadata table on the Explorer - CROW_ROUTE(app, "/populatetable") - ([this](const crow::request &req, response &res) { - absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState(); - if (!state_or.ok()) { - LOG(ERROR) << "get state fail"; - res.set_header("Content-Type", "application/json"); - res.end(""); - return; - } - - ResConfigData config_data = (*state_or).replica_config(); - ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), - CertificateInfo()); - - uint32_t replica_num = server_config.GetReplicaNum(); - uint32_t worker_num = server_config.GetWorkerNum(); - uint32_t client_batch_num = server_config.ClientBatchNum(); - uint32_t max_process_txn = server_config.GetMaxProcessTxn(); - uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS(); - uint32_t input_worker_num = server_config.GetInputWorkerNum(); - uint32_t output_worker_num = server_config.GetOutputWorkerNum(); - int client_timeout_ms = server_config.GetClientTimeoutMs(); - int min_data_receive_num = server_config.GetMinDataReceiveNum(); - size_t max_malicious_replica_num = - server_config.GetMaxMaliciousReplicaNum(); - int checkpoint_water_mark = server_config.GetCheckPointWaterMark(); - - // Don't read the ledger if first commit time is known - if (first_commit_time_ == 0) { - // Get first block in the chain - auto resp = txn_client_.GetTxn(1, 1); - // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - // uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica state fail"; - res.end("get replica state fail"); - }; - - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - if (request.ParseFromString(txn.second)) { - // transactions - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - } - - // see resilientdb/common/utils/utils.cpp - first_commit_time_ = request.createtime() / 1000000; - } - } - } - - uint64_t epoch_time = - std::chrono::duration_cast<std::chrono::seconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - uint64_t chain_age = - first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_; - - auto block_num_resp = txn_client_.GetBlockNumbers(); - if (!block_num_resp.ok()) { - LOG(ERROR) << "get number fail"; - exit(1); - } - - std::string values = ""; - values.append( - "[{ \"replicaNum\": " + std::to_string(replica_num) + - ", \"workerNum\" : " + std::to_string(worker_num) + - ", \"clientBatchNum\" : " + std::to_string(client_batch_num) + - ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) + - ", \"clientBatchWaitTime\" : " + - std::to_string(client_batch_wait_time) + - ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) + - ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) + - ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) + - ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) + - ", \"maxMaliciousReplicaNum\" : " + - std::to_string(max_malicious_replica_num) + - ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) + - ", \"transactionNum\" : " + std::to_string(num_transactions_) + - ", \"blockNum\" : " + std::to_string(*block_num_resp) + - ", \"chainAge\" : " + std::to_string(chain_age) + "}]"); - LOG(INFO) << std::string(values.c_str()); - res.set_header("Content-Type", "application/json"); - res.end(std::string(values.c_str())); - }); - - // Run the Crow app - app.port(port_num_).multithreaded().run(); -} - -// If batch_size is 1, the function will not add the extra outer [] braces -// Otherwise, a list of lists of blocks will be returned -std::string CrowService::GetAllBlocks(int batch_size, bool increment_txn_count, - bool make_sublists) { - int min_seq = 1; - bool full_batches = true; - - std::string values = "[\n"; - bool first_batch = true; - while (full_batches) { - std::string cur_batch_str = ""; - if (!first_batch) cur_batch_str.append(",\n"); - if (batch_size > 1 && make_sublists) cur_batch_str.append("["); - first_batch = false; - - int max_seq = min_seq + batch_size - 1; - auto resp = txn_client_.GetTxn(min_seq, max_seq); - absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - uint64_t min_seq, uint64_t max_seq); - if (!resp.ok()) { - LOG(ERROR) << "get replica txn fail"; - return ""; - }; - - int cur_size = 0; - bool first_batch_element = true; - for (auto &txn : *resp) { - BatchUserRequest request; - KVRequest kv_request; - cur_size++; - if (request.ParseFromString(txn.second)) { - if (!first_batch_element) cur_batch_str.append(","); - - first_batch_element = false; - - // id - uint64_t seq = txn.first; - cur_batch_str.append("{\"id\": " + std::to_string(seq)); - - // number - cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); - - // transactions - cur_batch_str.append(", \"transactions\": ["); - bool first_transaction = true; - for (auto &sub_req : request.user_requests()) { - kv_request.ParseFromString(sub_req.request().data()); - std::string kv_request_json = ParseKVRequest(kv_request); - - if (!first_transaction) cur_batch_str.append(","); - first_transaction = false; - cur_batch_str.append(kv_request_json); - cur_batch_str.append("\n"); - - if (increment_txn_count) num_transactions_++; - } - cur_batch_str.append("]"); // close transactions list - - // size - cur_batch_str.append(", \"size\": " + - std::to_string(request.ByteSizeLong())); - - // createdAt - uint64_t createtime = request.createtime(); - cur_batch_str.append(", \"createdAt\": \"" + - ParseCreateTime(createtime) + "\""); - } - cur_batch_str.append("}\n"); - } - full_batches = cur_size == batch_size; - if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); - - if (cur_size > 0) values.append(cur_batch_str); - - min_seq += batch_size; - } - - values.append("\n]\n"); - - return values; -} - -// Helper function used by the blocks endpoints to create JSON strings -std::string CrowService::ParseKVRequest(const KVRequest &kv_request) { - rapidjson::Document doc; - if (kv_request.cmd() == 1) { // SET - doc.SetObject(); - rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); - rapidjson::Value val(rapidjson::kObjectType); - doc.AddMember("cmd", "SET", allocator); - doc.AddMember("key", kv_request.key(), allocator); - doc.AddMember("value", kv_request.value(), allocator); - } else if (kv_request.cmd() == 2) { // GET - doc.SetObject(); - rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); - rapidjson::Value val(rapidjson::kObjectType); - doc.AddMember("cmd", "GET", allocator); - doc.AddMember("key", kv_request.key(), allocator); - } else if (kv_request.cmd() == 3) { // GETALLVALUES - doc.SetObject(); - rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); - rapidjson::Value val(rapidjson::kObjectType); - doc.AddMember("cmd", "GETALLVALUES", allocator); - } else if (kv_request.cmd() == 4) { // GETRANGE - doc.SetObject(); - rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); - rapidjson::Value val(rapidjson::kObjectType); - doc.AddMember("cmd", "GETRANGE", allocator); - doc.AddMember("min_key", kv_request.key(), allocator); - doc.AddMember("max_key", kv_request.value(), allocator); - } - - rapidjson::StringBuffer buffer; - rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); - doc.Accept(writer); - return buffer.GetString(); -} - -std::string CrowService::ParseCreateTime(uint64_t createtime) { - std::string timestr = ""; - uint64_t sec = - createtime / 1000000; // see resilientdb/common/utils/utils.cpp - - std::tm *tm_gmt = std::gmtime((time_t *)&sec); - int year = tm_gmt->tm_year + 1900; - int month = tm_gmt->tm_mon; // 0-indexed - int day = tm_gmt->tm_mday; - - std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}; - - // Using date time string format to support the Explorer transaction chart - if (day < 10) timestr += "0"; - timestr += std::to_string(day) + " " + months[month] + " " + - std::to_string(year) + " "; - - std::string hour_str = std::to_string(tm_gmt->tm_hour); - std::string min_str = std::to_string(tm_gmt->tm_min); - std::string sec_str = std::to_string(tm_gmt->tm_sec); - - if (tm_gmt->tm_hour < 10) hour_str = "0" + hour_str; - if (tm_gmt->tm_min < 10) min_str = "0" + min_str; - if (tm_gmt->tm_sec < 10) sec_str = "0" + sec_str; - - timestr += hour_str + ":" + min_str + ":" + sec_str + " GMT"; - - return timestr; -} - -} // namespace sdk + #include "service/http_server/crow_service.h" + + #include <glog/logging.h> + #include <rapidjson/document.h> + #include <rapidjson/stringbuffer.h> + #include <rapidjson/writer.h> + #include <stdint.h> + #include <sys/stat.h> + #include <sys/types.h> + #include <unistd.h> + + #include <chrono> + #include <ctime> + #include <fstream> + #include <memory> + #include <mutex> + #include <string> + #include <thread> + #include <unordered_set> + + using crow::request; + using crow::response; + using resdb::BatchUserRequest; + using resdb::CertificateInfo; + using resdb::KeyInfo; + using resdb::ReplicaInfo; + using resdb::ReplicaState; + using resdb::ResConfigData; + using resdb::ResDBConfig; + + namespace sdk { + + CrowService::CrowService(ResDBConfig client_config, ResDBConfig server_config, + uint16_t port_num) + : client_config_(client_config), + server_config_(server_config), + port_num_(port_num), + kv_client_(client_config_), + state_client_(client_config_) { + } + + void CrowService::run() { + crow::SimpleApp app; + + // For adding and removing websocket connections + std::mutex mtx; + + // Get all values + // CROW_ROUTE(app, "/v1/transactions") + // ([this](const crow::request &req, response &res) { + // uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + + // if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) { + // res.code = 503; + // res.set_header("Content-Type", "text/plain"); + // res.end( + // "Get all transactions functionality on cooldown (" + + // std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) + + // " ms left)"); + // } else { + // last_db_scan_time = cur_time; + + // auto values = kv_client_.GetAllValues(); + // if (values != nullptr) { + // // LOG(INFO) << "client getallvalues value = " << values->c_str(); + + // // Send updated blocks list to websocket + // if (users.size() > 0) { + // for (auto u : users) u->send_text("Update blocks"); + // } + + // num_transactions_++; + + // res.set_header("Content-Type", "application/json"); + // res.end(std::string(values->c_str())); + // } else { + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("getallvalues fail"); + // } + // } + // }); + + // Get value of specific id + CROW_ROUTE(app, "/v1/transactions/<string>") + ([this](const crow::request &req, response &res, std::string id) { + auto value = kv_client_.Get(id); + if (value != nullptr) { + LOG(INFO) << "client get value = " << value->c_str(); + + // Send updated blocks list to websocket + if (users.size() > 0) { + for (auto u : users) u->send_text("Update blocks"); + } + + num_transactions_++; + + res.set_header("Content-Type", "application/json"); + res.end(std::string(value->c_str())); + } else { + res.code = 500; + res.set_header("Content-Type", "text/plain"); + res.end("get value fail"); + } + }); + + // Get values based on key range + CROW_ROUTE(app, "/v1/transactions/<string>/<string>") + ([this](const crow::request &req, response &res, std::string min_id, + std::string max_id) { + auto value = kv_client_.GetRange(min_id, max_id); + if (value != nullptr) { + LOG(INFO) << "client getrange value = " << value->c_str(); + + // Send updated blocks list to websocket + if (users.size() > 0) { + for (auto u : users) u->send_text("Update blocks"); + } + + num_transactions_++; + + res.set_header("Content-Type", "application/json"); + res.end(std::string(value->c_str())); + } else { + res.code = 500; + res.set_header("Content-Type", "text/plain"); + res.end("getrange fail"); + } + }); + + // Commit a key-value pair, extracting the id parameter from the JSON + // object and setting the value as the entire JSON object + CROW_ROUTE(app, "/v1/transactions/commit") + .methods("POST"_method)([this](const request &req) { + std::string body = req.body; + LOG(INFO) << "body: " << body; + + // Parse transaction JSON + rapidjson::Document doc; + doc.Parse(body.c_str()); + if (!doc.IsObject() || !doc.HasMember("id")) { + response res(400, "Invalid transaction format"); // Bad Request + res.set_header("Content-Type", "text/plain"); + return res; + } + const std::string id = doc["id"].GetString(); + const std::string value = body; + + // Set key-value pair in kv server + int retval = kv_client_.Set(id, value); + + if (retval != 0) { + LOG(ERROR) << "Error when trying to commit id " << id; + response res(500, "id: " + id); + res.set_header("Content-Type", "text/plain"); + return res; + } + LOG(INFO) << "Set " << id << " to " << value; + + // Send updated blocks list to websocket + if (users.size() > 0) { + for (auto u : users) u->send_text("Update blocks"); + } + + num_transactions_++; + + response res(201, "id: " + id); // Created status code + res.set_header("Content-Type", "text/plain"); + return res; + }); + + // CROW_ROUTE(app, "/v1/blocks") + // ([this](const crow::request &req, response &res) { + // auto values = GetAllBlocks(100); + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); + + // Retrieve blocks in batches of size of the int parameter + // CROW_ROUTE(app, "/v1/blocks/<int>") + // ([this](const crow::request &req, response &res, int batch_size) { + // auto values = GetAllBlocks(batch_size, false, true); + // if (values == "") { + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("get replica state fail"); + // exit(1); + // }; + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); + + // Retrieve blocks within a range + // CROW_ROUTE(app, "/v1/blocks/<int>/<int>") + // ([this](const crow::request &req, response &res, int min_seq, int max_seq) { + // auto resp = txn_client_.GetTxn(min_seq, max_seq); + // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica state fail"; + // res.code = 500; + // res.set_header("Content-Type", "text/plain"); + // res.end("get replica state fail"); + // exit(1); + // } + // + // std::string values = "[\n"; + // bool first_iteration = true; + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // + // std::string cur_batch_str = ""; + // if (request.ParseFromString(txn.second)) { + // LOG(INFO) << request.DebugString(); + // + // if (!first_iteration) cur_batch_str.append(","); + // first_iteration = false; + // + // // id + // uint64_t seq = txn.first; + // cur_batch_str.append("{\"id\": " + std::to_string(seq)); + // + // // number + // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); + // + // // transactions + // cur_batch_str.append(", \"transactions\": ["); + // bool first_transaction = true; + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // + // if (!first_transaction) cur_batch_str.append(","); + // first_transaction = false; + // cur_batch_str.append(kv_request_json); + // cur_batch_str.append("\n"); + // } + // cur_batch_str.append("]"); // close transactions list + // + // // size + // cur_batch_str.append(", \"size\": " + + // std::to_string(request.ByteSizeLong())); + // + // // createdAt + // uint64_t createtime = request.createtime(); + // cur_batch_str.append(", \"createdAt\": \"" + + // ParseCreateTime(createtime) + "\""); + // } + // cur_batch_str.append("}\n"); + // values.append(cur_batch_str); + // } + // values.append("]\n"); + // res.set_header("Content-Type", "application/json"); + // res.end(values); + // }); + + CROW_ROUTE(app, "/blockupdatelistener") + .websocket() + .onopen([&](crow::websocket::connection &conn) { + LOG(INFO) << "Opened websocket"; + std::lock_guard<std::mutex> _(mtx); + users.insert(&conn); + }) + .onclose( + [&](crow::websocket::connection &conn, const std::string &reason) { + LOG(INFO) << "Closed websocket"; + std::lock_guard<std::mutex> _(mtx); + users.erase(&conn); + }) + .onmessage([&](crow::websocket::connection & /*conn*/, + const std::string &data, bool is_binary) { + // do nothing + }); + + // For metadata table on the Explorer + // CROW_ROUTE(app, "/populatetable") + // ([this](const crow::request &req, response &res) { + // absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState(); + // if (!state_or.ok()) { + // LOG(ERROR) << "get state fail"; + // res.set_header("Content-Type", "application/json"); + // res.end(""); + // return; + // } + // + // ResConfigData config_data = (*state_or).replica_config(); + // ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), + // CertificateInfo()); + // + // uint32_t replica_num = server_config.GetReplicaNum(); + // uint32_t worker_num = server_config.GetWorkerNum(); + // uint32_t client_batch_num = server_config.ClientBatchNum(); + // uint32_t max_process_txn = server_config.GetMaxProcessTxn(); + // uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS(); + // uint32_t input_worker_num = server_config.GetInputWorkerNum(); + // uint32_t output_worker_num = server_config.GetOutputWorkerNum(); + // int client_timeout_ms = server_config.GetClientTimeoutMs(); + // int min_data_receive_num = server_config.GetMinDataReceiveNum(); + // size_t max_malicious_replica_num = + // server_config.GetMaxMaliciousReplicaNum(); + // int checkpoint_water_mark = server_config.GetCheckPointWaterMark(); + // + // // Don't read the ledger if first commit time is known + // if (first_commit_time_ == 0) { + // // Get first block in the chain + // auto resp = txn_client_.GetTxn(1, 1); + // // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica state fail"; + // res.end("get replica state fail"); + // }; + // + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // if (request.ParseFromString(txn.second)) { + // // transactions + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // } + // + // // see resilientdb/common/utils/utils.cpp + // first_commit_time_ = request.createtime() / 1000000; + // } + // } + // } + // + // uint64_t epoch_time = + // std::chrono::duration_cast<std::chrono::seconds>( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + // uint64_t chain_age = + // first_commit_time_ == 0 ? 0 : epoch_time - first_commit_time_; + // + // auto block_num_resp = txn_client_.GetBlockNumbers(); + // if (!block_num_resp.ok()) { + // LOG(ERROR) << "get number fail"; + // exit(1); + // } + // + // std::string values = ""; + // values.append( + // "[{ \"replicaNum\": " + std::to_string(replica_num) + + // ", \"workerNum\" : " + std::to_string(worker_num) + + // ", \"clientBatchNum\" : " + std::to_string(client_batch_num) + + // ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) + + // ", \"clientBatchWaitTime\" : " + + // std::to_string(client_batch_wait_time) + + // ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) + + // ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) + + // ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) + + // ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) + + // ", \"maxMaliciousReplicaNum\" : " + + // std::to_string(max_malicious_replica_num) + + // ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) + + // ", \"transactionNum\" : " + std::to_string(num_transactions_) + + // ", \"blockNum\" : " + std::to_string(*block_num_resp) + + // ", \"chainAge\" : " + std::to_string(chain_age) + "}]"); + // LOG(INFO) << std::string(values.c_str()); + // res.set_header("Content-Type", "application/json"); + // res.end(std::string(values.c_str())); + // }); + + // Run the Crow app + app.port(port_num_).multithreaded().run(); + } + + // If batch_size is 1, the function will not add the extra outer [] braces + // Otherwise, a list of lists of blocks will be returned + // std::string CrowService::GetAllBlocks(int batch_size, bool increment_txn_count, + // bool make_sublists) { + // int min_seq = 1; + // bool full_batches = true; + // + // std::string values = "[\n"; + // bool first_batch = true; + // while (full_batches) { + // std::string cur_batch_str = ""; + // if (!first_batch) cur_batch_str.append(",\n"); + // if (batch_size > 1 && make_sublists) cur_batch_str.append("["); + // first_batch = false; + // + // int max_seq = min_seq + batch_size - 1; + // auto resp = txn_client_.GetTxn(min_seq, max_seq); + // absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( + // uint64_t min_seq, uint64_t max_seq); + // if (!resp.ok()) { + // LOG(ERROR) << "get replica txn fail"; + // return ""; + // }; + // + // int cur_size = 0; + // bool first_batch_element = true; + // for (auto &txn : *resp) { + // BatchUserRequest request; + // KVRequest kv_request; + // cur_size++; + // if (request.ParseFromString(txn.second)) { + // if (!first_batch_element) cur_batch_str.append(","); + // + // first_batch_element = false; + // + // // id + // uint64_t seq = txn.first; + // cur_batch_str.append("{\"id\": " + std::to_string(seq)); + // + // // number + // cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\""); + // + // // transactions + // cur_batch_str.append(", \"transactions\": ["); + // bool first_transaction = true; + // for (auto &sub_req : request.user_requests()) { + // kv_request.ParseFromString(sub_req.request().data()); + // std::string kv_request_json = ParseKVRequest(kv_request); + // + // if (!first_transaction) cur_batch_str.append(","); + // first_transaction = false; + // cur_batch_str.append(kv_request_json); + // cur_batch_str.append("\n"); + // + // if (increment_txn_count) num_transactions_++; + // } + // cur_batch_str.append("]"); // close transactions list + // + // // size + // cur_batch_str.append(", \"size\": " + + // std::to_string(request.ByteSizeLong())); + // + // // createdAt + // uint64_t createtime = request.createtime(); + // cur_batch_str.append(", \"createdAt\": \"" + + // ParseCreateTime(createtime) + "\""); + // } + // cur_batch_str.append("}\n"); + // } + // full_batches = cur_size == batch_size; + // if (batch_size > 1 && make_sublists) cur_batch_str.append("]"); + // + // if (cur_size > 0) values.append(cur_batch_str); + // + // min_seq += batch_size; + // } + // + // values.append("\n]\n"); + // + // return values; + // } + + // Helper function used by the blocks endpoints to create JSON strings + std::string CrowService::ParseKVRequest(const KVRequest &kv_request) { + rapidjson::Document doc; + if (kv_request.cmd() == 1) { // SET + doc.SetObject(); + rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); + rapidjson::Value val(rapidjson::kObjectType); + doc.AddMember("cmd", "SET", allocator); + doc.AddMember("key", kv_request.key(), allocator); + doc.AddMember("value", kv_request.value(), allocator); + } else if (kv_request.cmd() == 2) { // GET + doc.SetObject(); + rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); + rapidjson::Value val(rapidjson::kObjectType); + doc.AddMember("cmd", "GET", allocator); + doc.AddMember("key", kv_request.key(), allocator); + } else if (kv_request.cmd() == 3) { // GETALLVALUES + doc.SetObject(); + rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); + rapidjson::Value val(rapidjson::kObjectType); + doc.AddMember("cmd", "GETALLVALUES", allocator); + } else if (kv_request.cmd() == 4) { // GETRANGE + doc.SetObject(); + rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); + rapidjson::Value val(rapidjson::kObjectType); + doc.AddMember("cmd", "GETRANGE", allocator); + doc.AddMember("min_key", kv_request.key(), allocator); + doc.AddMember("max_key", kv_request.value(), allocator); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + doc.Accept(writer); + return buffer.GetString(); + } + + std::string CrowService::ParseCreateTime(uint64_t createtime) { + std::string timestr = ""; + uint64_t sec = + createtime / 1000000; // see resilientdb/common/utils/utils.cpp + + std::tm *tm_gmt = std::gmtime((time_t *)&sec); + int year = tm_gmt->tm_year + 1900; + int month = tm_gmt->tm_mon; // 0-indexed + int day = tm_gmt->tm_mday; + + std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}; + + // Using date time string format to support the Explorer transaction chart + if (day < 10) timestr += "0"; + timestr += std::to_string(day) + " " + months[month] + " " + + std::to_string(year) + " "; + + std::string hour_str = std::to_string(tm_gmt->tm_hour); + std::string min_str = std::to_string(tm_gmt->tm_min); + std::string sec_str = std::to_string(tm_gmt->tm_sec); + + if (tm_gmt->tm_hour < 10) hour_str = "0" + hour_str; + if (tm_gmt->tm_min < 10) min_str = "0" + min_str; + if (tm_gmt->tm_sec < 10) sec_str = "0" + sec_str; + + timestr += hour_str + ":" + min_str + ":" + sec_str + " GMT"; + + return timestr; + } + + } // namespace sdk \ No newline at end of file diff --git a/ecosystem/graphql/service/http_server/crow_service.h b/ecosystem/graphql/service/http_server/crow_service.h index 162968e2..b581cb7e 100644 --- a/ecosystem/graphql/service/http_server/crow_service.h +++ b/ecosystem/graphql/service/http_server/crow_service.h @@ -48,7 +48,6 @@ class CrowService { uint16_t port_num_; ResDBKVClient kv_client_; resdb::ResDBStateAccessor state_client_; - resdb::ResDBTxnAccessor txn_client_; std::unordered_set<crow::websocket::connection *> users; std::atomic_uint16_t num_transactions_ = 0; std::atomic_uint64_t first_commit_time_ = 0; diff --git a/ecosystem/graphql/service/http_server/server_config.config b/ecosystem/graphql/service/http_server/server_config.config index 58551cc5..f202248c 100644 --- a/ecosystem/graphql/service/http_server/server_config.config +++ b/ecosystem/graphql/service/http_server/server_config.config @@ -16,7 +16,25 @@ // under the License. -1 127.0.0.1 10001 -2 127.0.0.1 10002 -3 127.0.0.1 10003 -4 127.0.0.1 10004 +{ + replica_info : { + id:1, + ip:"172.31.57.186", + port: 17001, + }, + replica_info : { + id:2, + ip:"172.31.57.186", + port: 17002, + }, + replica_info : { + id:3, + ip:"172.31.57.186", + port: 17003, + }, + replica_info : { + id:4, + ip:"172.31.57.186", + port: 17004, + } +} \ No newline at end of file diff --git a/ecosystem/graphql/service/tools/config/interface/client.config b/ecosystem/graphql/service/tools/config/interface/client.config index 1334de2b..f994a57e 100644 --- a/ecosystem/graphql/service/tools/config/interface/client.config +++ b/ecosystem/graphql/service/tools/config/interface/client.config @@ -15,4 +15,13 @@ // specific language governing permissions and limitations // under the License. -5 127.0.0.1 10005 +{ + "replica_info": [ + { + "id": 5, + "ip": "172.31.57.186", + "port": 17005 + }, + ] +} + diff --git a/ecosystem/graphql/service/tools/config/interface/service.config b/ecosystem/graphql/service/tools/config/interface/service.config index 3d1f8e9c..fbf1d7bb 100644 --- a/ecosystem/graphql/service/tools/config/interface/service.config +++ b/ecosystem/graphql/service/tools/config/interface/service.config @@ -15,6 +15,14 @@ // specific language governing permissions and limitations // under the License. -5 127.0.0.1 10005 +{ + "replica_info": [ + { + "id": 5, + "ip": "172.31.57.186", + "port": 17005 + }, + ] +} diff --git a/ecosystem/graphql/third_party/BUILD b/ecosystem/graphql/third_party/BUILD index 25da0d1f..3e6e468e 100644 --- a/ecosystem/graphql/third_party/BUILD +++ b/ecosystem/graphql/third_party/BUILD @@ -35,3 +35,10 @@ cc_library( "@rapidjson", ], ) + +cc_library( + name = "json", + deps = [ + "@nlohmann_json//:json", + ], +) \ No newline at end of file diff --git a/ecosystem/graphql/third_party/BUILD b/ecosystem/graphql/third_party/json.BUILD similarity index 78% copy from ecosystem/graphql/third_party/BUILD copy to ecosystem/graphql/third_party/json.BUILD index 25da0d1f..f0fd8e0b 100644 --- a/ecosystem/graphql/third_party/BUILD +++ b/ecosystem/graphql/third_party/json.BUILD @@ -16,22 +16,15 @@ # specific language governing permissions and limitations # under the License. # -# - -package(default_visibility = ["//visibility:public"]) -load("@rules_foreign_cc//foreign_cc:defs.bzl", "configure_make", "make") +licenses(["notice"]) +exports_files(["LICENSE"]) -cc_library( - name = "crow", - deps = [ - "@com_crowcpp_crow//:crow", - ], -) +package(default_visibility = ["//visibility:public"]) cc_library( - name = "rapidjson", - deps = [ - "@rapidjson", - ], -) + name = "json", + hdrs = glob(["single_include/nlohmann/*.hpp"]), + includes = ["single_include"], + visibility = ["//visibility:public"], +) \ No newline at end of file
