This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit a3330963e64aee95646e8643e166e91cf6927f02 Author: bchou9 <[email protected]> AuthorDate: Wed Jan 7 20:25:19 2026 +0000 Added back stats changes --- .gitignore | 3 +- .../consensus/execution/transaction_executor.cpp | 4 + platform/consensus/ordering/pbft/commitment.cpp | 10 +- platform/networkstrate/service_network.cpp | 12 + platform/statistic/BUILD | 5 + platform/statistic/stats.cpp | 403 +++++++++++++++++++-- platform/statistic/stats.h | 28 +- 7 files changed, 438 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index be0ae77a..d4b8448d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ resdb/ 100*_db/ gmon.out .history/ -*_db/ \ No newline at end of file +*_db/ +resdb_data_*/* \ No newline at end of file diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 0a7401bc..71c228a0 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -302,6 +302,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, std::unique_ptr<BatchUserResponse> response; global_stats_->GetTransactionDetails(*batch_request_p); + uint64_t seq = request->seq(); + global_stats_->RecordExecuteStart(seq); if (transaction_manager_ && need_execute) { if (execute_thread_num_ == 1) { response = transaction_manager_->ExecuteBatchWithSeq(request->seq(), @@ -354,6 +356,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, if (response == nullptr) { response = std::make_unique<BatchUserResponse>(); } + global_stats_->RecordExecuteEnd(seq); global_stats_->IncTotalRequest(batch_request_p->user_requests_size()); response->set_proxy_id(batch_request_p->proxy_id()); response->set_createtime(batch_request_p->createtime() + @@ -363,6 +366,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, response->set_seq(request->seq()); if (post_exec_func_) { post_exec_func_(std::move(request), std::move(response)); + global_stats_->RecordResponseSent(seq); } global_stats_->IncExecuteDone(); diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 463d0161..f8ab3d9d 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -263,14 +263,16 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, } return ret; } - // global_stats_->IncPrepare(); + global_stats_->IncPrepare(); + uint64_t seq = request->seq(); + int sender_id = request->sender_id(); + global_stats_->RecordPrepareRecv(seq, sender_id); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); commit_request->mutable_data_signature()->Clear(); // Add request to message_manager. // If it has received enough same requests(2f+1), broadcast the commit // message. - uint64_t seq = request->seq(); CollectorResultCode ret = message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { @@ -303,11 +305,13 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, return -2; } uint64_t seq = request->seq(); + int sender_id = request->sender_id(); if (request->is_recovery()) { return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } - // global_stats_->IncCommit(); + global_stats_->IncCommit(); + global_stats_->RecordCommitRecv(seq, sender_id); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will // commit the request. diff --git a/platform/networkstrate/service_network.cpp b/platform/networkstrate/service_network.cpp index 1e69fd25..0bc60579 100644 --- a/platform/networkstrate/service_network.cpp +++ b/platform/networkstrate/service_network.cpp @@ -26,6 +26,7 @@ #include "platform/common/network/tcp_socket.h" #include "platform/proto/broadcast.pb.h" +#include "platform/proto/resdb.pb.h" namespace resdb { @@ -73,6 +74,17 @@ void ServiceNetwork::AcceptorHandler(const char* buffer, size_t data_len) { item->data = std::move(sub_request_info); // LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data // len:"<<item->data->data_len; + + // Try to extract seq from request for timeline tracking + try { + Request request; + if (request.ParseFromArray(sub_data.data(), sub_data.size())) { + global_stats_->RecordNetworkRecv(request.seq()); + } + } catch (...) { + // Ignore parse errors, seq extraction is optional + } + global_stats_->ServerCall(); input_queue_.Push(std::move(item)); } diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 92328f16..042668e1 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -26,6 +26,10 @@ cc_library( name = "stats", srcs = ["stats.cpp"], hdrs = ["stats.h"], + copts = select({ + "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"], + "//conditions:default": [], + }), deps = [ ":prometheus_handler", "//common:asio", @@ -38,6 +42,7 @@ cc_library( "//proto/kv:kv_cc_proto", "//third_party:crow", "//third_party:prometheus", + "//third_party:leveldb", ], ) diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 21f1524f..5b4e3aec 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -22,8 +22,15 @@ #include <glog/logging.h> #include <ctime> +#include <sstream> +#include <fstream> +#include <sys/stat.h> +#include <errno.h> #include "common/utils/utils.h" +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/write_batch.h" #include "proto/kv/kv.pb.h" namespace asio = boost::asio; @@ -124,22 +131,99 @@ void Stats::CrowRoute() { crow::SimpleApp app; while (!stop_) { try { + //Deprecated CROW_ROUTE(app, "/consensus_data") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 1"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body = consensus_history_.dump(); - res.end(); - }); + .methods("GET"_method)( + [this](const crow::request& req, crow::response& res) { + LOG(ERROR) << "API 1"; + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", + "Content-Type, Authorization"); + + int limit = 10; + int page = 1; + + std::string url = req.url; + size_t query_pos = url.find('?'); + if (query_pos != std::string::npos) { + std::string query_string = url.substr(query_pos + 1); + std::istringstream iss(query_string); + std::string param; + while (std::getline(iss, param, '&')) { + size_t eq_pos = param.find('='); + if (eq_pos != std::string::npos) { + std::string key = param.substr(0, eq_pos); + std::string value = param.substr(eq_pos + 1); + if (key == "limit") { + try { + limit = std::stoi(value); + if (limit <= 0) limit = 10; + } catch (...) { + limit = 10; + } + } else if (key == "page") { + try { + page = std::stoi(value); + if (page <= 0) page = 1; + } catch (...) { + page = 1; + } + } + } + } + } + + nlohmann::json result; + int total_count = 0; + int offset = (page - 1) * limit; + int current_index = 0; + int items_collected = 0; + + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + leveldb::Iterator* it = + summary_db_->NewIterator(leveldb::ReadOptions()); + + // Count only non-timeline entries + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + if (key.find("timeline_") != 0) { + total_count++; + } + } + + // Iterate and collect only non-timeline entries + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + // Skip timeline entries + if (key.find("timeline_") == 0) { + continue; + } + + if (current_index >= offset && items_collected < limit) { + try { + result[key] = + nlohmann::json::parse(it->value().ToString()); + items_collected++; + } catch (...) { + res.code = 500; + result["error"] = "Failed to parse transaction data"; + delete it; + res.body = result.dump(); + res.end(); + return; + } + } + current_index++; + } + delete it; + } + + res.body = result.dump(); + res.end(); + }); CROW_ROUTE(app, "/get_status") .methods("GET"_method)([this](const crow::request& req, crow::response& res) { @@ -168,11 +252,12 @@ void Stats::CrowRoute() { "Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers + res.body = "Not Enabled"; // Send your response if (enable_faulty_switch_) { make_faulty_.store(!make_faulty_.load()); + res.body = "Success"; } - res.body = "Success"; res.end(); }); CROW_ROUTE(app, "/transaction_data") @@ -210,6 +295,126 @@ void Stats::CrowRoute() { mem_view_json.clear(); res.end(); }); + CROW_ROUTE(app, "/consensus_data/<int>") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res, int txn_number) { + LOG(ERROR) << "API 5: Get transaction " << txn_number; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + nlohmann::json result; + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + std::string txn_key = std::to_string(txn_number); + std::string value; + leveldb::Status status = + summary_db_->Get(leveldb::ReadOptions(), txn_key, &value); + + if (status.ok() && !value.empty()) { + try { + result[txn_key] = nlohmann::json::parse(value); + } catch (...) { + res.code = 500; + result["error"] = "Failed to parse transaction data"; + } + } else { + res.code = 404; + result["error"] = "Transaction not found"; + result["txn_number"] = txn_number; + } + } else { + res.code = 503; + result["error"] = "Storage not initialized"; + } + + res.body = result.dump(); + res.end(); + }); + + CROW_ROUTE(app, "/transaction_timeline/<int>") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res, int txn_number) { + LOG(ERROR) << "API 6: Get transaction timeline " << txn_number; + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", + "Content-Type, Authorization"); + + nlohmann::json result; + result["txn_number"] = txn_number; + + if (!summary_db_) { + res.code = 503; + result["error"] = "Storage not initialized"; + res.body = result.dump(); + res.end(); + return; + } + + std::lock_guard<std::mutex> lock(summary_db_mutex_); + std::string txn_key = std::to_string(txn_number); + std::string timeline_key = "timeline_" + txn_key; + + std::string txn_value; + leveldb::Status status = + summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value); + + if (status.ok() && !txn_value.empty()) { + try { + nlohmann::json txn_summary = nlohmann::json::parse(txn_value); + nlohmann::json txn_details; + if (txn_summary.contains("txn_commands")) { + txn_details["txn_commands"] = txn_summary["txn_commands"]; + } + if (txn_summary.contains("txn_keys")) { + txn_details["txn_keys"] = txn_summary["txn_keys"]; + } + if (txn_summary.contains("txn_values")) { + txn_details["txn_values"] = txn_summary["txn_values"]; + } + if (txn_summary.contains("propose_pre_prepare_time")) { + txn_details["propose_pre_prepare_time"] = + txn_summary["propose_pre_prepare_time"]; + } + if (txn_summary.contains("prepare_time")) { + txn_details["prepare_time"] = txn_summary["prepare_time"]; + } + if (txn_summary.contains("commit_time")) { + txn_details["commit_time"] = txn_summary["commit_time"]; + } + if (txn_summary.contains("execution_time")) { + txn_details["execution_time"] = txn_summary["execution_time"]; + } + result["transaction_details"] = txn_details; + } catch (...) { + LOG(ERROR) << "Failed to parse transaction summary for txn: " + << txn_number; + } + } + + std::string timeline_value; + status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, &timeline_value); + + if (status.ok() && !timeline_value.empty()) { + try { + nlohmann::json events_array = nlohmann::json::parse(timeline_value); + result["timeline"] = events_array; + } catch (...) { + LOG(ERROR) << "Failed to parse timeline for txn: " << txn_number; + } + } else { + LOG(ERROR) << "Timeline not found for txn: " << txn_number; + } + + res.body = result.dump(); + res.end(); + }); app.port(8500 + transaction_summary_.port).multithreaded().run(); sleep(1); } catch (const std::exception& e) { @@ -233,6 +438,27 @@ void Stats::SetProps(int replica_id, std::string ip, int port, enable_resview = resview_flag; enable_faulty_switch_ = faulty_flag; if (resview_flag) { + // Single data directory for both summaries and timeline + std::string data_dir = "./resdb_data_" + std::to_string(port); + int mkdir_result = mkdir(data_dir.c_str(), 0755); + if (mkdir_result != 0 && errno != EEXIST) { + LOG(ERROR) << "Failed to create data directory: " << data_dir; + } + + // Initialize LevelDB for both transaction summaries and timeline events + std::string summary_path = data_dir + "/summaries"; + leveldb::Options options; + options.create_if_missing = true; + leveldb::DB* db = nullptr; + leveldb::Status status = leveldb::DB::Open(options, summary_path, &db); + if (status.ok()) { + summary_db_.reset(db); + LOG(INFO) << "Initialized LevelDB storage at: " << summary_path; + } else { + LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": " + << status.ToString(); + } + crow_thread_ = std::thread(&Stats::CrowRoute, this); } } @@ -344,8 +570,63 @@ void Stats::SendSummary() { summary_json_["ext_cache_hit_ratio"] = transaction_summary_.ext_cache_hit_ratio_; - consensus_history_[std::to_string(transaction_summary_.txn_number)] = - summary_json_; + + std::string txn_key = std::to_string(transaction_summary_.txn_number); + + if (summary_db_) { + std::lock_guard<std::mutex> lock(summary_db_mutex_); + + // Write transaction summary + leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key, + summary_json_.dump()); + if (!status.ok()) { + LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key + << ": " << status.ToString(); + } + + // Batch write all buffered timeline events for this transaction + { + std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_); + auto it = timeline_buffer_.find(transaction_summary_.txn_number); + if (it != timeline_buffer_.end() && !it->second.empty()) { + std::string timeline_key = "timeline_" + txn_key; + + // Read existing timeline events if they exist, then merge + nlohmann::json events_array; + std::string existing_value; + leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, &existing_value); + if (read_status.ok() && !existing_value.empty()) { + try { + events_array = nlohmann::json::parse(existing_value); + // Ensure it's an array + if (!events_array.is_array()) { + events_array = nlohmann::json::array(); + } + } catch (...) { + // If parsing fails, start with empty array + events_array = nlohmann::json::array(); + } + } else { + events_array = nlohmann::json::array(); + } + + // Append new events to existing array + for (const auto& event : it->second) { + events_array.push_back(event); + } + + // Write merged timeline events back to LevelDB + status = summary_db_->Put(leveldb::WriteOptions(), timeline_key, + events_array.dump()); + if (!status.ok()) { + LOG(ERROR) << "Failed to write timeline to storage for txn: " << txn_key + << ": " << status.ToString(); + } + // Clean up buffer for this txn + timeline_buffer_.erase(it); + } + } + } LOG(ERROR) << summary_json_.dump(); @@ -364,6 +645,76 @@ void Stats::SendSummary() { summary_json_.clear(); } +void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int sender_id) { + if (!enable_resview) { + return; + } + + // Buffer timeline events in memory - written to LevelDB in batch during SendSummary() + std::lock_guard<std::mutex> lock(timeline_buffer_mutex_); + + // Create new event + nlohmann::json event; + event["timestamp"] = std::chrono::system_clock::now().time_since_epoch().count(); + event["phase"] = phase; + if (sender_id >= 0) { + event["sender_id"] = sender_id; + } + + // Append to in-memory buffer for this seq + timeline_buffer_[seq].push_back(event); +} + +void Stats::RecordNetworkRecv(uint64_t seq) { + WriteTimelineEvent(seq, "network_recv"); +} + +void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) { + WriteTimelineEvent(seq, "prepare_recv", sender_id); +} + +void Stats::RecordCommitRecv(uint64_t seq, int sender_id) { + WriteTimelineEvent(seq, "commit_recv", sender_id); +} + +void Stats::RecordExecuteStart(uint64_t seq) { + WriteTimelineEvent(seq, "execute_start"); +} + +void Stats::RecordExecuteEnd(uint64_t seq) { + WriteTimelineEvent(seq, "execute_end"); +} + +void Stats::RecordResponseSent(uint64_t seq) { + WriteTimelineEvent(seq, "response_sent"); +} + +void Stats::CleanupOldTimelineEntries() { + // Periodically clean up old buffer entries to prevent memory leaks + // Keep only entries within the last 10000 sequence numbers + std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_); + + if (timeline_buffer_.empty()) { + return; + } + + // Get the highest sequence number we've seen + uint64_t max_seq = timeline_buffer_.rbegin()->first; + uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0; + + // Remove all entries below the threshold + auto it = timeline_buffer_.begin(); + while (it != timeline_buffer_.end() && it->first < cleanup_threshold) { + it = timeline_buffer_.erase(it); + } + + size_t buffer_size = timeline_buffer_.size(); + if (buffer_size > 1000) { + LOG(WARNING) << "Timeline buffer size is large: " << buffer_size + << " entries. This may indicate transactions not completing."; + } +} + void Stats::MonitorGlobal() { LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_; @@ -398,6 +749,10 @@ void Stats::MonitorGlobal() { while (!stop_) { sleep(monitor_sleep_time_); time += monitor_sleep_time_; + + // Periodically clean up old timeline buffer entries to prevent memory leaks + CleanupOldTimelineEntries(); + seq_fail = seq_fail_; socket_recv = socket_recv_; client_call = client_call_; @@ -529,8 +884,10 @@ void Stats::IncPrepare() { prometheus_->Inc(PREPARE, 1); } num_prepare_++; - transaction_summary_.prepare_message_count_times_list.push_back( - std::chrono::system_clock::now()); + if (enable_resview) { + transaction_summary_.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } } void Stats::IncCommit() { @@ -538,8 +895,10 @@ void Stats::IncCommit() { prometheus_->Inc(COMMIT, 1); } num_commit_++; - transaction_summary_.commit_message_count_times_list.push_back( - std::chrono::system_clock::now()); + if (enable_resview) { + transaction_summary_.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } } void Stats::IncPendingExecute() { pending_execute_++; } diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 849c83d1..a2524fc1 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -23,6 +23,7 @@ #include <chrono> #include <future> +#include <map> #include <nlohmann/json.hpp> #include "boost/asio.hpp" @@ -33,6 +34,11 @@ #include "proto/kv/kv.pb.h" #include "sys/resource.h" +// Forward declaration for LevelDB +namespace leveldb { +class DB; +} + namespace asio = boost::asio; namespace beast = boost::beast; using tcp = asio::ip::tcp; @@ -91,6 +97,15 @@ class Stats { bool IsFaulty(); void ChangePrimary(int primary_id); + // Timeline event recording (disk-based, only when resview enabled) + void RecordNetworkRecv(uint64_t seq); + void RecordPrepareRecv(uint64_t seq, int sender_id); + void RecordCommitRecv(uint64_t seq, int sender_id); + void RecordExecuteStart(uint64_t seq); + void RecordExecuteEnd(uint64_t seq); + void RecordResponseSent(uint64_t seq); + void CleanupOldTimelineEntries(); // Prevent timeline buffer memory leak + void AddLatency(uint64_t run_time); void Monitor(); @@ -159,9 +174,20 @@ class Stats { std::atomic<uint64_t> prev_num_prepare_; std::atomic<uint64_t> prev_num_commit_; nlohmann::json summary_json_; - nlohmann::json consensus_history_; std::unique_ptr<PrometheusHandler> prometheus_; + + std::unique_ptr<leveldb::DB> summary_db_; + std::mutex summary_db_mutex_; + + // In-memory timeline event buffer (per-seq) - batched writes to LevelDB + std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_; + std::mutex timeline_buffer_mutex_; + + // Timeline directory for per-transaction event logs + std::string timeline_dir_; + std::mutex timeline_mutex_; + void WriteTimelineEvent(uint64_t seq, const std::string& phase, int sender_id = -1); }; } // namespace resdb
