This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 6d4cb97dc209abf32b87f58ef2cf446483053c8e Author: bchou9 <[email protected]> AuthorDate: Thu Jan 8 06:34:27 2026 +0000 per sequence telemetry tracking --- .../consensus/execution/transaction_executor.cpp | 2 +- platform/consensus/ordering/pbft/commitment.cpp | 14 +- platform/statistic/stats.cpp | 1444 ++++++++++++-------- platform/statistic/stats.h | 400 ++++-- 4 files changed, 1118 insertions(+), 742 deletions(-) diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index bb7cf6be..3e30d3f6 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -205,7 +205,7 @@ void TransactionExecutor::OrderMessage() { } void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) { - global_stats_->IncCommit(); + global_stats_->IncCommit(message->seq()); message->set_commit_time(GetCurrentTime()); execute_queue_.Push(std::move(message)); } diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 5a970dd6..7ebb769f 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -135,7 +135,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, return -2; } - global_stats_->RecordStateTime("request"); + global_stats_->RecordStateTime(*seq, "request"); user_request->set_type(Request::TYPE_PRE_PREPARE); user_request->set_current_view(message_manager_->GetCurrentView()); @@ -229,7 +229,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, } global_stats_->IncPropose(); - global_stats_->RecordStateTime("pre-prepare"); + global_stats_->RecordStateTime(request->seq(), "pre-prepare"); std::unique_ptr<Request> prepare_request = resdb::NewRequest( Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id()); prepare_request->clear_data(); @@ -263,9 +263,9 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, } return ret; } - global_stats_->IncPrepare(); uint64_t seq = request->seq(); int sender_id = request->sender_id(); + global_stats_->IncPrepare(seq); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); commit_request->mutable_data_signature()->Clear(); @@ -289,7 +289,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, // LOG(ERROR) << "sign hash" // << commit_request->data_signature().DebugString(); } - global_stats_->RecordStateTime("prepare"); + global_stats_->RecordStateTime(seq, "prepare"); replica_communicator_->BroadCast(*commit_request); } return ret == CollectorResultCode::INVALID ? -2 : 0; @@ -309,7 +309,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } - global_stats_->IncCommit(); + global_stats_->IncCommit(seq); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will // commit the request. @@ -318,7 +318,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, if (ret == CollectorResultCode::STATE_CHANGED) { // LOG(ERROR)<<request->data().size(); // global_stats_->GetTransactionDetails(request->data()); - global_stats_->RecordStateTime("commit"); + global_stats_->RecordStateTime(seq, "commit"); } return ret == CollectorResultCode::INVALID ? -2 : 0; } @@ -331,7 +331,7 @@ int Commitment::PostProcessExecutedMsg() { if (batch_resp == nullptr) { continue; } - global_stats_->SendSummary(); + global_stats_->SendSummary(batch_resp->seq()); Request request; request.set_hash(batch_resp->hash()); request.set_seq(batch_resp->seq()); diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 1c511b3b..8957e733 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -17,589 +17,861 @@ * under the License. */ - #include "platform/statistic/stats.h" - - #include <glog/logging.h> - - #include <ctime> - - #include "common/utils/utils.h" - #include "proto/kv/kv.pb.h" - - namespace asio = boost::asio; - namespace beast = boost::beast; - using tcp = asio::ip::tcp; - - namespace resdb { - - std::mutex g_mutex; - Stats* Stats::GetGlobalStats(int seconds) { - std::unique_lock<std::mutex> lk(g_mutex); - static Stats stats(seconds); - return &stats; - } // gets a singelton instance of Stats Class - - Stats::Stats(int sleep_time) { - monitor_sleep_time_ = sleep_time; - #ifdef TEST_MODE - monitor_sleep_time_ = 1; - #endif - num_call_ = 0; - num_commit_ = 0; - run_time_ = 0; - run_call_ = 0; - run_call_time_ = 0; - server_call_ = 0; - server_process_ = 0; - run_req_num_ = 0; - run_req_run_time_ = 0; - seq_gap_ = 0; - total_request_ = 0; - total_geo_request_ = 0; - geo_request_ = 0; - - stop_ = false; - begin_ = false; - - socket_recv_ = 0; - broad_cast_msg_ = 0; - send_broad_cast_msg_ = 0; - - prometheus_ = nullptr; - global_thread_ = - std::thread(&Stats::MonitorGlobal, this); // pass by reference - - transaction_summary_.port = -1; - - // Setup websocket here - make_faulty_.store(false); - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.commit_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.execution_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.txn_number = 0; - } - - void Stats::Stop() { stop_ = true; } - - Stats::~Stats() { - stop_ = true; - if (global_thread_.joinable()) { - global_thread_.join(); - } - if (enable_resview && crow_thread_.joinable()) { - crow_thread_.join(); - } - } - - int64_t GetRSS() { - int64_t rss = 0; - FILE* fp = NULL; - if ((fp = fopen("/proc/self/statm", "r")) == NULL) { - return 0; - } - - uint64_t size, resident, share, text, lib, data, dt; - if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, &text, - &lib, &data, &dt) != 7) { - fclose(fp); - return 0; - } - fclose(fp); - - int64_t page_size = sysconf(_SC_PAGESIZE); - rss = resident * page_size; - - // Convert to MB - rss = rss / (1024 * 1024); - - return rss; - } - - void Stats::CrowRoute() { - crow::SimpleApp app; - while (!stop_) { - try { - CROW_ROUTE(app, "/consensus_data") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 1"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body = consensus_history_.dump(); - res.end(); - }); - CROW_ROUTE(app, "/get_status") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 2"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body = IsFaulty() ? "Faulty" : "Not Faulty"; - res.end(); - }); - CROW_ROUTE(app, "/make_faulty") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 3"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - if (enable_faulty_switch_) { - make_faulty_.store(!make_faulty_.load()); - } - res.body = "Success"; - res.end(); - }); - CROW_ROUTE(app, "/transaction_data") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 4"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - nlohmann::json mem_view_json; - int status = - getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_); - if (status == 0) { - mem_view_json["resident_set_size"] = GetRSS(); - mem_view_json["max_resident_set_size"] = - transaction_summary_.process_stats_.ru_maxrss; - mem_view_json["num_reads"] = - transaction_summary_.process_stats_.ru_inblock; - mem_view_json["num_writes"] = - transaction_summary_.process_stats_.ru_oublock; - } - - mem_view_json["ext_cache_hit_ratio"] = - transaction_summary_.ext_cache_hit_ratio_; - mem_view_json["level_db_stats"] = - transaction_summary_.level_db_stats_; - mem_view_json["level_db_approx_mem_size"] = - transaction_summary_.level_db_approx_mem_size_; - res.body = mem_view_json.dump(); - mem_view_json.clear(); - res.end(); - }); - app.port(8500 + transaction_summary_.port).multithreaded().run(); - sleep(1); - } catch (const std::exception& e) { - } - } - app.stop(); - } - - bool Stats::IsFaulty() { return make_faulty_.load(); } - - void Stats::ChangePrimary(int primary_id) { - transaction_summary_.primary_id = primary_id; - make_faulty_.store(false); - } - - void Stats::SetProps(int replica_id, std::string ip, int port, - bool resview_flag, bool faulty_flag) { - transaction_summary_.replica_id = replica_id; - transaction_summary_.ip = ip; - transaction_summary_.port = port; - enable_resview = resview_flag; - enable_faulty_switch_ = faulty_flag; - if (resview_flag) { - crow_thread_ = std::thread(&Stats::CrowRoute, this); - } - } - - void Stats::SetPrimaryId(int primary_id) { - transaction_summary_.primary_id = primary_id; - } - - void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, - std::string level_db_stats, - std::string level_db_approx_mem_size) { - transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; - transaction_summary_.level_db_stats_ = level_db_stats; - transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size; - } - - void Stats::RecordStateTime(std::string state) { - if (!enable_resview) { - return; - } - if (state == "request" || state == "pre-prepare") { - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::now(); - } else if (state == "prepare") { - transaction_summary_.prepare_state_time = std::chrono::system_clock::now(); - } else if (state == "commit") { - transaction_summary_.commit_state_time = std::chrono::system_clock::now(); - } - } - - void Stats::GetTransactionDetails(BatchUserRequest batch_request) { - if (!enable_resview) { - return; - } - transaction_summary_.txn_number = batch_request.seq(); - transaction_summary_.txn_command.clear(); - transaction_summary_.txn_key.clear(); - transaction_summary_.txn_value.clear(); - for (auto& sub_request : batch_request.user_requests()) { - KVRequest kv_request; - if (!kv_request.ParseFromString(sub_request.request().data())) { - break; - } - if (kv_request.cmd() == KVRequest::SET) { - transaction_summary_.txn_command.push_back("SET"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(kv_request.value()); - } else if (kv_request.cmd() == KVRequest::GET) { - transaction_summary_.txn_command.push_back("GET"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(""); - } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { - transaction_summary_.txn_command.push_back("GETALLVALUES"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(""); - } else if (kv_request.cmd() == KVRequest::GETRANGE) { - transaction_summary_.txn_command.push_back("GETRANGE"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(kv_request.value()); - } - } - } - - void Stats::SendSummary() { - if (!enable_resview) { - return; - } - transaction_summary_.execution_time = std::chrono::system_clock::now(); - - // Convert Transaction Summary to JSON - summary_json_["replica_id"] = transaction_summary_.replica_id; - summary_json_["ip"] = transaction_summary_.ip; - summary_json_["port"] = transaction_summary_.port; - summary_json_["primary_id"] = transaction_summary_.primary_id; - summary_json_["propose_pre_prepare_time"] = - transaction_summary_.request_pre_prepare_state_time.time_since_epoch() - .count(); - summary_json_["prepare_time"] = - transaction_summary_.prepare_state_time.time_since_epoch().count(); - summary_json_["commit_time"] = - transaction_summary_.commit_state_time.time_since_epoch().count(); - summary_json_["execution_time"] = - transaction_summary_.execution_time.time_since_epoch().count(); - for (size_t i = 0; - i < transaction_summary_.prepare_message_count_times_list.size(); i++) { - summary_json_["prepare_message_timestamps"].push_back( - transaction_summary_.prepare_message_count_times_list[i] - .time_since_epoch() - .count()); - } - for (size_t i = 0; - i < transaction_summary_.commit_message_count_times_list.size(); i++) { - summary_json_["commit_message_timestamps"].push_back( - transaction_summary_.commit_message_count_times_list[i] - .time_since_epoch() - .count()); - } - summary_json_["txn_number"] = transaction_summary_.txn_number; - for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) { - summary_json_["txn_commands"].push_back( - transaction_summary_.txn_command[i]); - } - for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) { - summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]); - } - for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) { - summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]); - } - - summary_json_["ext_cache_hit_ratio"] = - transaction_summary_.ext_cache_hit_ratio_; - consensus_history_[std::to_string(transaction_summary_.txn_number)] = - summary_json_; - - LOG(ERROR) << summary_json_.dump(); - - // Reset Transaction Summary Parameters - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.commit_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.execution_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_message_count_times_list.clear(); - transaction_summary_.commit_message_count_times_list.clear(); - - summary_json_.clear(); - } - - void Stats::MonitorGlobal() { - LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_; - - uint64_t seq_fail = 0; - uint64_t client_call = 0, socket_recv = 0; - uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0, - pending_execute = 0, execute = 0, execute_done = 0; - uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0; - uint64_t send_broad_cast_msg_per_rep = 0; - uint64_t server_call = 0, server_process = 0; - uint64_t seq_gap = 0; - uint64_t total_request = 0, total_geo_request = 0, geo_request = 0; - - // ====== for client proxy ====== - uint64_t run_req_num = 0, run_req_run_time = 0; - - uint64_t last_run_req_num = 0, last_run_req_run_time = 0; - // ============================= - - uint64_t last_seq_fail = 0; - uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0, - last_num_commit = 0; - uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0; - uint64_t last_client_call = 0, last_socket_recv = 0; - uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0; - uint64_t last_send_broad_cast_msg_per_rep = 0; - uint64_t last_server_call = 0, last_server_process = 0; - uint64_t last_total_request = 0, last_total_geo_request = 0, - last_geo_request = 0; - uint64_t time = 0; - - while (!stop_) { - sleep(monitor_sleep_time_); - time += monitor_sleep_time_; - seq_fail = seq_fail_; - socket_recv = socket_recv_; - client_call = client_call_; - num_client_req = num_client_req_; - num_propose = num_propose_; - num_prepare = num_prepare_; - num_commit = num_commit_; - pending_execute = pending_execute_; - execute = execute_; - execute_done = execute_done_; - broad_cast_msg = broad_cast_msg_; - send_broad_cast_msg = send_broad_cast_msg_; - send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_; - server_call = server_call_; - server_process = server_process_; - seq_gap = seq_gap_; - total_request = total_request_; - total_geo_request = total_geo_request_; - geo_request = geo_request_; - - run_req_num = run_req_num_; - run_req_run_time = run_req_run_time_; - - LOG(ERROR) << "=========== monitor =========\n" - << "server call:" << server_call - last_server_call - << " server process:" << server_process - last_server_process - << " socket recv:" << socket_recv - last_socket_recv - << " " - "client call:" - << client_call - last_client_call - << " " - "client req:" - << num_client_req - last_num_client_req - << " " - "broad_cast:" - << broad_cast_msg - last_broad_cast_msg - << " " - "send broad_cast:" - << send_broad_cast_msg - last_send_broad_cast_msg - << " " - "per send broad_cast:" - << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep - << " " - "propose:" - << num_propose - last_num_propose - << " " - "prepare:" - << (num_prepare - last_num_prepare) - << " " - "commit:" - << (num_commit - last_num_commit) - << " " - "pending execute:" - << pending_execute - last_pending_execute - << " " - "execute:" - << execute - last_execute - << " " - "execute done:" - << execute_done - last_execute_done << " seq gap:" << seq_gap - << " total request:" << total_request - last_total_request - << " txn:" << (total_request - last_total_request) / 5 - << " total geo request:" - << total_geo_request - last_total_geo_request - << " total geo request per:" - << (total_geo_request - last_total_geo_request) / 5 - << " geo request:" << (geo_request - last_geo_request) - << " " - "seq fail:" - << seq_fail - last_seq_fail << " time:" << time - << " " - "\n--------------- monitor ------------"; - if (run_req_num - last_run_req_num > 0) { - LOG(ERROR) << " req client latency:" - << static_cast<double>(run_req_run_time - - last_run_req_run_time) / - (run_req_num - last_run_req_num) / 1000000000.0; - } - - last_seq_fail = seq_fail; - last_socket_recv = socket_recv; - last_client_call = client_call; - last_num_client_req = num_client_req; - last_num_propose = num_propose; - last_num_prepare = num_prepare; - last_num_commit = num_commit; - last_pending_execute = pending_execute; - last_execute = execute; - last_execute_done = execute_done; - - last_broad_cast_msg = broad_cast_msg; - last_send_broad_cast_msg = send_broad_cast_msg; - last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep; - - last_server_call = server_call; - last_server_process = server_process; - - last_run_req_num = run_req_num; - last_run_req_run_time = run_req_run_time; - last_total_request = total_request; - last_total_geo_request = total_geo_request; - last_geo_request = geo_request; - } - } - - void Stats::IncClientCall() { - if (prometheus_) { - prometheus_->Inc(CLIENT_CALL, 1); - } - client_call_++; - } - - void Stats::IncClientRequest() { - if (prometheus_) { - prometheus_->Inc(CLIENT_REQ, 1); - } - num_client_req_++; - } - - void Stats::IncPropose() { - if (prometheus_) { - prometheus_->Inc(PROPOSE, 1); - } - num_propose_++; - } - - void Stats::IncPrepare() { - if (prometheus_) { - prometheus_->Inc(PREPARE, 1); - } - num_prepare_++; - transaction_summary_.prepare_message_count_times_list.push_back( - std::chrono::system_clock::now()); - } - - void Stats::IncCommit() { - if (prometheus_) { - prometheus_->Inc(COMMIT, 1); - } - num_commit_++; - transaction_summary_.commit_message_count_times_list.push_back( - std::chrono::system_clock::now()); - } - - void Stats::IncPendingExecute() { pending_execute_++; } - - void Stats::IncExecute() { execute_++; } - - void Stats::IncExecuteDone() { - if (prometheus_) { - prometheus_->Inc(EXECUTE, 1); - } - execute_done_++; - } - - void Stats::BroadCastMsg() { - if (prometheus_) { - prometheus_->Inc(BROAD_CAST, 1); - } - broad_cast_msg_++; - } - - void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; } - - void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; } - - void Stats::SeqFail() { seq_fail_++; } - - void Stats::IncTotalRequest(uint32_t num) { - if (prometheus_) { - prometheus_->Inc(NUM_EXECUTE_TX, num); - } - total_request_ += num; - } - - void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; } - - void Stats::IncGeoRequest() { geo_request_++; } - - void Stats::ServerCall() { - if (prometheus_) { - prometheus_->Inc(SERVER_CALL_NAME, 1); - } - server_call_++; - } - - void Stats::ServerProcess() { - if (prometheus_) { - prometheus_->Inc(SERVER_PROCESS, 1); - } - server_process_++; - } - - void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; } - - void Stats::AddLatency(uint64_t run_time) { - run_req_num_++; - run_req_run_time_ += run_time; - } - - void Stats::SetPrometheus(const std::string& prometheus_address) { - prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address); - } - - } // namespace resdb \ No newline at end of file +#include "platform/statistic/stats.h" + +#include <glog/logging.h> + +#include <ctime> + +#include "common/utils/utils.h" +#include "proto/kv/kv.pb.h" + +namespace asio = boost::asio; +namespace beast = boost::beast; +using tcp = asio::ip::tcp; + +namespace resdb { + +std::mutex g_mutex; +Stats* Stats::GetGlobalStats(int seconds) { + std::unique_lock<std::mutex> lk(g_mutex); + static Stats stats(seconds); + return &stats; +} // gets a singelton instance of Stats Class + +Stats::Stats(int sleep_time) { + monitor_sleep_time_ = sleep_time; +#ifdef TEST_MODE + monitor_sleep_time_ = 1; +#endif + num_call_ = 0; + num_commit_ = 0; + run_time_ = 0; + run_call_ = 0; + run_call_time_ = 0; + server_call_ = 0; + server_process_ = 0; + run_req_num_ = 0; + run_req_run_time_ = 0; + seq_gap_ = 0; + total_request_ = 0; + total_geo_request_ = 0; + geo_request_ = 0; + + stop_ = false; + begin_ = false; + + socket_recv_ = 0; + broad_cast_msg_ = 0; + send_broad_cast_msg_ = 0; + + prometheus_ = nullptr; + global_thread_ = + std::thread(&Stats::MonitorGlobal, this); // pass by reference + + transaction_summary_.port = -1; + + // Setup websocket here + make_faulty_.store(false); + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.commit_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.execution_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.txn_number = 0; + + // Initialize static telemetry info + static_telemetry_info_.port = -1; +} + +void Stats::Stop() { stop_ = true; } + +Stats::~Stats() { + stop_ = true; + if (global_thread_.joinable()) { + global_thread_.join(); + } + if (enable_resview && crow_thread_.joinable()) { + crow_thread_.join(); + } +} + +int64_t GetRSS() { + int64_t rss = 0; + FILE* fp = NULL; + if ((fp = fopen("/proc/self/statm", "r")) == NULL) { + return 0; + } + + uint64_t size, resident, share, text, lib, data, dt; + if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, &text, + &lib, &data, &dt) != 7) { + fclose(fp); + return 0; + } + fclose(fp); + + int64_t page_size = sysconf(_SC_PAGESIZE); + rss = resident * page_size; + + // Convert to MB + rss = rss / (1024 * 1024); + + return rss; +} + +void Stats::CrowRoute() { + crow::SimpleApp app; + while (!stop_) { + try { + CROW_ROUTE(app, "/consensus_data") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 1"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + { + std::lock_guard<std::mutex> lock(consensus_history_mutex_); + res.body = consensus_history_.dump(); + } + res.end(); + }); + CROW_ROUTE(app, "/get_status") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 2"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body = IsFaulty() ? "Faulty" : "Not Faulty"; + res.end(); + }); + CROW_ROUTE(app, "/make_faulty") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 3"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + if (enable_faulty_switch_) { + make_faulty_.store(!make_faulty_.load()); + } + res.body = "Success"; + res.end(); + }); + CROW_ROUTE(app, "/transaction_data") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 4"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + nlohmann::json mem_view_json; + int status = + getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_); + if (status == 0) { + mem_view_json["resident_set_size"] = GetRSS(); + mem_view_json["max_resident_set_size"] = + transaction_summary_.process_stats_.ru_maxrss; + mem_view_json["num_reads"] = + transaction_summary_.process_stats_.ru_inblock; + mem_view_json["num_writes"] = + transaction_summary_.process_stats_.ru_oublock; + } + + mem_view_json["ext_cache_hit_ratio"] = + transaction_summary_.ext_cache_hit_ratio_; + mem_view_json["level_db_stats"] = + transaction_summary_.level_db_stats_; + mem_view_json["level_db_approx_mem_size"] = + transaction_summary_.level_db_approx_mem_size_; + res.body = mem_view_json.dump(); + mem_view_json.clear(); + res.end(); + }); + app.port(8500 + transaction_summary_.port).multithreaded().run(); + sleep(1); + } catch (const std::exception& e) { + } + } + app.stop(); +} + +bool Stats::IsFaulty() { return make_faulty_.load(); } + +void Stats::ChangePrimary(int primary_id) { + transaction_summary_.primary_id = primary_id; + make_faulty_.store(false); +} + +void Stats::SetProps(int replica_id, std::string ip, int port, + bool resview_flag, bool faulty_flag) { + transaction_summary_.replica_id = replica_id; + transaction_summary_.ip = ip; + transaction_summary_.port = port; + // Also set static telemetry info for new transactions + static_telemetry_info_.replica_id = replica_id; + static_telemetry_info_.ip = ip; + static_telemetry_info_.port = port; + enable_resview = resview_flag; + enable_faulty_switch_ = faulty_flag; + if (resview_flag) { + crow_thread_ = std::thread(&Stats::CrowRoute, this); + } +} + +void Stats::SetPrimaryId(int primary_id) { + transaction_summary_.primary_id = primary_id; + static_telemetry_info_.primary_id = primary_id; +} + +void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, + std::string level_db_stats, + std::string level_db_approx_mem_size) { + transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; + transaction_summary_.level_db_stats_ = level_db_stats; + transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size; + static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; + static_telemetry_info_.level_db_stats_ = level_db_stats; + static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size; +} + +size_t Stats::GetShardIndex(uint64_t seq) const { + return seq % kTelemetryShards; +} + +TransactionTelemetry& Stats::GetOrCreateTelemetry(uint64_t seq) { + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + // Initialize with static info - only copy static fields, preserve + // timestamps as min + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = + static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = + static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + // Timestamps remain at min() which is correct + it = map.emplace(seq, std::move(telemetry)).first; + } + return it->second; +} + +void Stats::RecordStateTime(std::string state) { + // Old method without seq - deprecated, but keep for backwards compatibility + // This uses the old single transaction_summary_ approach + if (!enable_resview) { + return; + } + // Temporary fix: protect access to transaction_summary_ + std::lock_guard<std::mutex> lock( + telemetry_mutex_[0]); // Use first shard for old code + if (state == "request" || state == "pre-prepare") { + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::now(); + } else if (state == "prepare") { + transaction_summary_.prepare_state_time = std::chrono::system_clock::now(); + } else if (state == "commit") { + transaction_summary_.commit_state_time = std::chrono::system_clock::now(); + } +} + +void Stats::RecordStateTime(uint64_t seq, std::string state) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + // Initialize with static info - DO NOT reset existing entry + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = + static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = + static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + // Timestamps start at min() - correct + it = map.emplace(seq, std::move(telemetry)).first; + } + + // Entry exists - update timestamp only, don't touch other fields + auto& telemetry = it->second; + auto now = std::chrono::system_clock::now(); + + if (state == "request" || state == "pre-prepare") { + telemetry.request_pre_prepare_state_time = now; + } else if (state == "prepare") { + telemetry.prepare_state_time = now; + } else if (state == "commit") { + telemetry.commit_state_time = now; + } +} + +void Stats::GetTransactionDetails(BatchUserRequest batch_request) { + if (!enable_resview) { + return; + } + + uint64_t seq = batch_request.seq(); + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + bool entry_existed = (it != map.end()); + + if (!entry_existed) { + // Initialize with static info - timestamps start at min() which is correct + // They will be set by RecordStateTime() calls later + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = + static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = + static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + // Timestamps remain at min() - will be updated by RecordStateTime() if + // needed + it = map.emplace(seq, std::move(telemetry)).first; + } + + // Entry exists (either just created or already existed) + // CRITICAL: Only update transaction details fields, NEVER touch timestamp + // fields + auto& telemetry = it->second; + telemetry.txn_number = seq; + + // Only update transaction detail fields - timestamps are NEVER modified here + telemetry.txn_command.clear(); + telemetry.txn_key.clear(); + telemetry.txn_value.clear(); + + for (auto& sub_request : batch_request.user_requests()) { + KVRequest kv_request; + if (!kv_request.ParseFromString(sub_request.request().data())) { + break; + } + + if (kv_request.cmd() == KVRequest::SET) { + telemetry.txn_command.push_back("SET"); + telemetry.txn_key.push_back(kv_request.key()); + telemetry.txn_value.push_back(kv_request.value()); + } else if (kv_request.cmd() == KVRequest::GET) { + telemetry.txn_command.push_back("GET"); + telemetry.txn_key.push_back(kv_request.key()); + telemetry.txn_value.push_back(""); + } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { + telemetry.txn_command.push_back("GETALLVALUES"); + telemetry.txn_key.push_back(kv_request.key()); + telemetry.txn_value.push_back(""); + } else if (kv_request.cmd() == KVRequest::GETRANGE) { + telemetry.txn_command.push_back("GETRANGE"); + telemetry.txn_key.push_back(kv_request.key()); + telemetry.txn_value.push_back(kv_request.value()); + } + } + + // Timestamps (request_pre_prepare_state_time, prepare_state_time, + // commit_state_time, execution_time, prepare_message_count_times_list, + // commit_message_count_times_list) are NEVER modified in this function - they + // are only set by RecordStateTime(), IncPrepare(), IncCommit(), and + // SendSummary() +} + +void Stats::SendSummary() { + // Old method without seq - deprecated, but keep for backwards compatibility + // This uses the old single transaction_summary_ approach + if (!enable_resview) { + return; + } + std::lock_guard<std::mutex> lock( + telemetry_mutex_[0]); // Use first shard for old code + + transaction_summary_.execution_time = std::chrono::system_clock::now(); + + // Convert Transaction Summary to JSON (old manual way) + summary_json_["replica_id"] = transaction_summary_.replica_id; + summary_json_["ip"] = transaction_summary_.ip; + summary_json_["port"] = transaction_summary_.port; + summary_json_["primary_id"] = transaction_summary_.primary_id; + summary_json_["propose_pre_prepare_time"] = + transaction_summary_.request_pre_prepare_state_time.time_since_epoch() + .count(); + summary_json_["prepare_time"] = + transaction_summary_.prepare_state_time.time_since_epoch().count(); + summary_json_["commit_time"] = + transaction_summary_.commit_state_time.time_since_epoch().count(); + summary_json_["execution_time"] = + transaction_summary_.execution_time.time_since_epoch().count(); + for (size_t i = 0; + i < transaction_summary_.prepare_message_count_times_list.size(); i++) { + summary_json_["prepare_message_timestamps"].push_back( + transaction_summary_.prepare_message_count_times_list[i] + .time_since_epoch() + .count()); + } + for (size_t i = 0; + i < transaction_summary_.commit_message_count_times_list.size(); i++) { + summary_json_["commit_message_timestamps"].push_back( + transaction_summary_.commit_message_count_times_list[i] + .time_since_epoch() + .count()); + } + summary_json_["txn_number"] = transaction_summary_.txn_number; + for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) { + summary_json_["txn_commands"].push_back( + transaction_summary_.txn_command[i]); + } + for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) { + summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]); + } + for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) { + summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]); + } + + summary_json_["ext_cache_hit_ratio"] = + transaction_summary_.ext_cache_hit_ratio_; + + { + std::lock_guard<std::mutex> history_lock(consensus_history_mutex_); + consensus_history_[std::to_string(transaction_summary_.txn_number)] = + summary_json_; + } + + LOG(ERROR) << summary_json_.dump(); + + // Reset Transaction Summary Parameters + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.commit_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.execution_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_message_count_times_list.clear(); + transaction_summary_.commit_message_count_times_list.clear(); + + summary_json_.clear(); +} + +void Stats::SendSummary(uint64_t seq) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + LOG(WARNING) << "SendSummary called for unknown transaction: " << seq; + return; + } + + auto& telemetry = it->second; + + // CRITICAL: Set execution_time before serialization, but don't modify any + // other fields + telemetry.execution_time = std::chrono::system_clock::now(); + + // Use automatic JSON serialization - this reads current state, doesn't modify + // it + nlohmann::json summary_json = telemetry.to_json(); + + { + std::lock_guard<std::mutex> history_lock(consensus_history_mutex_); + // Only update if we don't already have this sequence, or update existing + // This prevents overwriting good data with bad data + std::string seq_str = std::to_string(seq); + if (consensus_history_.find(seq_str) == consensus_history_.end()) { + // First time storing - always store + consensus_history_[seq_str] = summary_json; + } else { + // Already exists - only update if new data has more complete information + // (i.e., has transaction details or more timestamps) + auto& existing = consensus_history_[seq_str]; + bool new_has_details = !summary_json["txn_commands"].empty(); + bool existing_has_details = !existing["txn_commands"].empty(); + + // If new data has details and existing doesn't, merge them + if (new_has_details && !existing_has_details) { + existing["txn_commands"] = summary_json["txn_commands"]; + existing["txn_keys"] = summary_json["txn_keys"]; + existing["txn_values"] = summary_json["txn_values"]; + } + + // Always update execution_time as it's the latest + existing["execution_time"] = summary_json["execution_time"]; + } + } + + LOG(ERROR) << summary_json.dump(); + + // DON'T delete the entry - per-sequence state means each seq has its own + // entry and it will naturally be reused or cleaned up later if needed + // Deleting causes issues if SendSummary() is called multiple times or + // if telemetry updates happen after sending +} + +void Stats::MonitorGlobal() { + LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_; + + uint64_t seq_fail = 0; + uint64_t client_call = 0, socket_recv = 0; + uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0, + pending_execute = 0, execute = 0, execute_done = 0; + uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0; + uint64_t send_broad_cast_msg_per_rep = 0; + uint64_t server_call = 0, server_process = 0; + uint64_t seq_gap = 0; + uint64_t total_request = 0, total_geo_request = 0, geo_request = 0; + + // ====== for client proxy ====== + uint64_t run_req_num = 0, run_req_run_time = 0; + + uint64_t last_run_req_num = 0, last_run_req_run_time = 0; + // ============================= + + uint64_t last_seq_fail = 0; + uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0, + last_num_commit = 0; + uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0; + uint64_t last_client_call = 0, last_socket_recv = 0; + uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0; + uint64_t last_send_broad_cast_msg_per_rep = 0; + uint64_t last_server_call = 0, last_server_process = 0; + uint64_t last_total_request = 0, last_total_geo_request = 0, + last_geo_request = 0; + uint64_t time = 0; + + while (!stop_) { + sleep(monitor_sleep_time_); + time += monitor_sleep_time_; + seq_fail = seq_fail_; + socket_recv = socket_recv_; + client_call = client_call_; + num_client_req = num_client_req_; + num_propose = num_propose_; + num_prepare = num_prepare_; + num_commit = num_commit_; + pending_execute = pending_execute_; + execute = execute_; + execute_done = execute_done_; + broad_cast_msg = broad_cast_msg_; + send_broad_cast_msg = send_broad_cast_msg_; + send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_; + server_call = server_call_; + server_process = server_process_; + seq_gap = seq_gap_; + total_request = total_request_; + total_geo_request = total_geo_request_; + geo_request = geo_request_; + + run_req_num = run_req_num_; + run_req_run_time = run_req_run_time_; + + LOG(ERROR) << "=========== monitor =========\n" + << "server call:" << server_call - last_server_call + << " server process:" << server_process - last_server_process + << " socket recv:" << socket_recv - last_socket_recv + << " " + "client call:" + << client_call - last_client_call + << " " + "client req:" + << num_client_req - last_num_client_req + << " " + "broad_cast:" + << broad_cast_msg - last_broad_cast_msg + << " " + "send broad_cast:" + << send_broad_cast_msg - last_send_broad_cast_msg + << " " + "per send broad_cast:" + << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep + << " " + "propose:" + << num_propose - last_num_propose + << " " + "prepare:" + << (num_prepare - last_num_prepare) + << " " + "commit:" + << (num_commit - last_num_commit) + << " " + "pending execute:" + << pending_execute - last_pending_execute + << " " + "execute:" + << execute - last_execute + << " " + "execute done:" + << execute_done - last_execute_done << " seq gap:" << seq_gap + << " total request:" << total_request - last_total_request + << " txn:" << (total_request - last_total_request) / 5 + << " total geo request:" + << total_geo_request - last_total_geo_request + << " total geo request per:" + << (total_geo_request - last_total_geo_request) / 5 + << " geo request:" << (geo_request - last_geo_request) + << " " + "seq fail:" + << seq_fail - last_seq_fail << " time:" << time + << " " + "\n--------------- monitor ------------"; + if (run_req_num - last_run_req_num > 0) { + LOG(ERROR) << " req client latency:" + << static_cast<double>(run_req_run_time - + last_run_req_run_time) / + (run_req_num - last_run_req_num) / 1000000000.0; + } + + last_seq_fail = seq_fail; + last_socket_recv = socket_recv; + last_client_call = client_call; + last_num_client_req = num_client_req; + last_num_propose = num_propose; + last_num_prepare = num_prepare; + last_num_commit = num_commit; + last_pending_execute = pending_execute; + last_execute = execute; + last_execute_done = execute_done; + + last_broad_cast_msg = broad_cast_msg; + last_send_broad_cast_msg = send_broad_cast_msg; + last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep; + + last_server_call = server_call; + last_server_process = server_process; + + last_run_req_num = run_req_num; + last_run_req_run_time = run_req_run_time; + last_total_request = total_request; + last_total_geo_request = total_geo_request; + last_geo_request = geo_request; + } +} + +void Stats::IncClientCall() { + if (prometheus_) { + prometheus_->Inc(CLIENT_CALL, 1); + } + client_call_++; +} + +void Stats::IncClientRequest() { + if (prometheus_) { + prometheus_->Inc(CLIENT_REQ, 1); + } + num_client_req_++; +} + +void Stats::IncPropose() { + if (prometheus_) { + prometheus_->Inc(PROPOSE, 1); + } + num_propose_++; +} + +void Stats::IncPrepare() { + if (prometheus_) { + prometheus_->Inc(PREPARE, 1); + } + num_prepare_++; + // Old method without seq - deprecated + if (enable_resview) { + std::lock_guard<std::mutex> lock( + telemetry_mutex_[0]); // Use first shard for old code + transaction_summary_.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } +} + +void Stats::IncPrepare(uint64_t seq) { + if (prometheus_) { + prometheus_->Inc(PREPARE, 1); + } + num_prepare_++; + if (enable_resview) { + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = + static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = + static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + it->second.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } +} + +void Stats::IncCommit() { + if (prometheus_) { + prometheus_->Inc(COMMIT, 1); + } + num_commit_++; + // Old method without seq - deprecated + if (enable_resview) { + std::lock_guard<std::mutex> lock( + telemetry_mutex_[0]); // Use first shard for old code + transaction_summary_.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } +} + +void Stats::IncCommit(uint64_t seq) { + if (prometheus_) { + prometheus_->Inc(COMMIT, 1); + } + num_commit_++; + if (enable_resview) { + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = + static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = + static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + it->second.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } +} + +void Stats::IncPendingExecute() { pending_execute_++; } + +void Stats::IncExecute() { execute_++; } + +void Stats::IncExecuteDone() { + if (prometheus_) { + prometheus_->Inc(EXECUTE, 1); + } + execute_done_++; +} + +void Stats::BroadCastMsg() { + if (prometheus_) { + prometheus_->Inc(BROAD_CAST, 1); + } + broad_cast_msg_++; +} + +void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; } + +void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; } + +void Stats::SeqFail() { seq_fail_++; } + +void Stats::IncTotalRequest(uint32_t num) { + if (prometheus_) { + prometheus_->Inc(NUM_EXECUTE_TX, num); + } + total_request_ += num; +} + +void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; } + +void Stats::IncGeoRequest() { geo_request_++; } + +void Stats::ServerCall() { + if (prometheus_) { + prometheus_->Inc(SERVER_CALL_NAME, 1); + } + server_call_++; +} + +void Stats::ServerProcess() { + if (prometheus_) { + prometheus_->Inc(SERVER_PROCESS, 1); + } + server_process_++; +} + +void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; } + +void Stats::AddLatency(uint64_t run_time) { + run_req_num_++; + run_req_run_time_ += run_time; +} + +void Stats::SetPrometheus(const std::string& prometheus_address) { + prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address); +} + +} // namespace resdb \ No newline at end of file diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index c0ad30a7..4a0462bd 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -17,151 +17,255 @@ * under the License. */ - #pragma once - - #include <crow.h> - - #include <chrono> - #include <future> - #include <nlohmann/json.hpp> - - #include "boost/asio.hpp" - #include "boost/beast.hpp" - #include "platform/common/network/tcp_socket.h" - #include "platform/proto/resdb.pb.h" - #include "platform/statistic/prometheus_handler.h" - #include "proto/kv/kv.pb.h" - #include "sys/resource.h" - - namespace asio = boost::asio; - namespace beast = boost::beast; - using tcp = asio::ip::tcp; - - namespace resdb { - - struct VisualData { - // Set when initializing - int replica_id; - int primary_id; - std::string ip; - int port; - - // Set when new txn is received - int txn_number; - std::vector<std::string> txn_command; - std::vector<std::string> txn_key; - std::vector<std::string> txn_value; - - // Request state if primary_id==replica_id, pre_prepare state otherwise - std::chrono::system_clock::time_point request_pre_prepare_state_time; - std::chrono::system_clock::time_point prepare_state_time; - std::vector<std::chrono::system_clock::time_point> - prepare_message_count_times_list; - std::chrono::system_clock::time_point commit_state_time; - std::vector<std::chrono::system_clock::time_point> - commit_message_count_times_list; - std::chrono::system_clock::time_point execution_time; - - // Storage Engine Stats - double ext_cache_hit_ratio_; - std::string level_db_stats_; - std::string level_db_approx_mem_size_; - - // process stats - struct rusage process_stats_; - }; - - class Stats { - public: - static Stats* GetGlobalStats(int sleep_seconds = 5); - - void Stop(); - - void RetrieveProgress(); - void SetProps(int replica_id, std::string ip, int port, bool resview_flag, - bool faulty_flag); - void SetPrimaryId(int primary_id); - void SetStorageEngineMetrics(double ext_cache_hit_ratio, - std::string level_db_stats, - std::string level_db_approx_mem_size); - void RecordStateTime(std::string state); - void GetTransactionDetails(BatchUserRequest batch_request); - void SendSummary(); - void CrowRoute(); - bool IsFaulty(); - void ChangePrimary(int primary_id); - - void AddLatency(uint64_t run_time); - - void Monitor(); - void MonitorGlobal(); - - void IncSocketRecv(); - - void IncClientCall(); - - void IncClientRequest(); - void IncPropose(); - void IncPrepare(); - void IncCommit(); - void IncPendingExecute(); - void IncExecute(); - void IncExecuteDone(); - - void BroadCastMsg(); - void SendBroadCastMsg(uint32_t num); - void SendBroadCastMsgPerRep(); - void SeqFail(); - void IncTotalRequest(uint32_t num); - void IncTotalGeoRequest(uint32_t num); - void IncGeoRequest(); - - void SeqGap(uint64_t seq_gap); - // Network in->worker - void ServerCall(); - void ServerProcess(); - void SetPrometheus(const std::string& prometheus_address); - - protected: - Stats(int sleep_time = 5); - ~Stats(); - - private: - std::string monitor_port_ = "default"; - std::string name_; - std::atomic<int> num_call_, run_call_; - std::atomic<uint64_t> last_time_, run_time_, run_call_time_; - std::thread thread_; - std::atomic<bool> begin_; - std::atomic<bool> stop_; - std::condition_variable cv_; - std::mutex mutex_; - - std::thread global_thread_; - std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, - num_commit_, pending_execute_, execute_, execute_done_; - std::atomic<uint64_t> client_call_, socket_recv_; - std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_, - send_broad_cast_msg_per_rep_; - std::atomic<uint64_t> seq_fail_; - std::atomic<uint64_t> server_call_, server_process_; - std::atomic<uint64_t> run_req_num_; - std::atomic<uint64_t> run_req_run_time_; - std::atomic<uint64_t> seq_gap_; - std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_; - int monitor_sleep_time_ = 5; // default 5s. - - std::thread crow_thread_; - bool enable_resview; - bool enable_faulty_switch_; - VisualData transaction_summary_; - std::atomic<bool> make_faulty_; - std::atomic<uint64_t> prev_num_prepare_; - std::atomic<uint64_t> prev_num_commit_; - nlohmann::json summary_json_; - nlohmann::json consensus_history_; - - std::unique_ptr<PrometheusHandler> prometheus_; - }; - - } // namespace resdb \ No newline at end of file +#pragma once + +#include <crow.h> + +#include <chrono> +#include <future> +#include <mutex> +#include <nlohmann/json.hpp> +#include <unordered_map> + +#include "boost/asio.hpp" +#include "boost/beast.hpp" +#include "platform/common/network/tcp_socket.h" +#include "platform/proto/resdb.pb.h" +#include "platform/statistic/prometheus_handler.h" +#include "proto/kv/kv.pb.h" +#include "sys/resource.h" + +namespace asio = boost::asio; +namespace beast = boost::beast; +using tcp = asio::ip::tcp; + +namespace resdb { + +// Per-transaction telemetry data with automatic JSON serialization +struct TransactionTelemetry { + // Static info (set once, copied to each transaction) + int replica_id = 0; + int primary_id = 0; + std::string ip; + int port = -1; + + // Transaction-specific data + uint64_t txn_number = 0; + std::vector<std::string> txn_command; + std::vector<std::string> txn_key; + std::vector<std::string> txn_value; + + // Timestamps + std::chrono::system_clock::time_point request_pre_prepare_state_time; + std::chrono::system_clock::time_point prepare_state_time; + std::vector<std::chrono::system_clock::time_point> + prepare_message_count_times_list; + std::chrono::system_clock::time_point commit_state_time; + std::vector<std::chrono::system_clock::time_point> + commit_message_count_times_list; + std::chrono::system_clock::time_point execution_time; + + // Storage Engine Stats + double ext_cache_hit_ratio_ = 0.0; + std::string level_db_stats_; + std::string level_db_approx_mem_size_; + + // Default constructor with min timestamps + TransactionTelemetry() + : request_pre_prepare_state_time( + std::chrono::system_clock::time_point::min()), + prepare_state_time(std::chrono::system_clock::time_point::min()), + commit_state_time(std::chrono::system_clock::time_point::min()), + execution_time(std::chrono::system_clock::time_point::min()) {} + + // Convert to JSON + nlohmann::json to_json() const { + nlohmann::json json; + json["replica_id"] = replica_id; + json["ip"] = ip; + json["port"] = port; + json["primary_id"] = primary_id; + json["propose_pre_prepare_time"] = + request_pre_prepare_state_time.time_since_epoch().count(); + json["prepare_time"] = prepare_state_time.time_since_epoch().count(); + json["commit_time"] = commit_state_time.time_since_epoch().count(); + json["execution_time"] = execution_time.time_since_epoch().count(); + + json["prepare_message_timestamps"] = nlohmann::json::array(); + for (const auto& ts : prepare_message_count_times_list) { + json["prepare_message_timestamps"].push_back( + ts.time_since_epoch().count()); + } + + json["commit_message_timestamps"] = nlohmann::json::array(); + for (const auto& ts : commit_message_count_times_list) { + json["commit_message_timestamps"].push_back( + ts.time_since_epoch().count()); + } + + json["txn_number"] = txn_number; + json["txn_commands"] = txn_command; + json["txn_keys"] = txn_key; + json["txn_values"] = txn_value; + json["ext_cache_hit_ratio"] = ext_cache_hit_ratio_; + json["level_db_stats"] = level_db_stats_; + json["level_db_approx_mem_size"] = level_db_approx_mem_size_; + + return json; + } +}; + +// Keep old VisualData for backwards compatibility if needed +struct VisualData { + // Set when initializing + int replica_id; + int primary_id; + std::string ip; + int port; + + // Set when new txn is received + int txn_number; + std::vector<std::string> txn_command; + std::vector<std::string> txn_key; + std::vector<std::string> txn_value; + + // Request state if primary_id==replica_id, pre_prepare state otherwise + std::chrono::system_clock::time_point request_pre_prepare_state_time; + std::chrono::system_clock::time_point prepare_state_time; + std::vector<std::chrono::system_clock::time_point> + prepare_message_count_times_list; + std::chrono::system_clock::time_point commit_state_time; + std::vector<std::chrono::system_clock::time_point> + commit_message_count_times_list; + std::chrono::system_clock::time_point execution_time; + + // Storage Engine Stats + double ext_cache_hit_ratio_; + std::string level_db_stats_; + std::string level_db_approx_mem_size_; + + // process stats + struct rusage process_stats_; +}; + +class Stats { + public: + static Stats* GetGlobalStats(int sleep_seconds = 5); + + void Stop(); + + void RetrieveProgress(); + void SetProps(int replica_id, std::string ip, int port, bool resview_flag, + bool faulty_flag); + void SetPrimaryId(int primary_id); + void SetStorageEngineMetrics(double ext_cache_hit_ratio, + std::string level_db_stats, + std::string level_db_approx_mem_size); + void RecordStateTime(std::string state); + void RecordStateTime(uint64_t seq, std::string state); + void GetTransactionDetails(BatchUserRequest batch_request); + void SendSummary(); + void SendSummary(uint64_t seq); + void CrowRoute(); + bool IsFaulty(); + void ChangePrimary(int primary_id); + + void AddLatency(uint64_t run_time); + + void Monitor(); + void MonitorGlobal(); + + void IncSocketRecv(); + + void IncClientCall(); + + void IncClientRequest(); + void IncPropose(); + void IncPrepare(); + void IncPrepare(uint64_t seq); + void IncCommit(); + void IncCommit(uint64_t seq); + void IncPendingExecute(); + void IncExecute(); + void IncExecuteDone(); + + void BroadCastMsg(); + void SendBroadCastMsg(uint32_t num); + void SendBroadCastMsgPerRep(); + void SeqFail(); + void IncTotalRequest(uint32_t num); + void IncTotalGeoRequest(uint32_t num); + void IncGeoRequest(); + + void SeqGap(uint64_t seq_gap); + // Network in->worker + void ServerCall(); + void ServerProcess(); + void SetPrometheus(const std::string& prometheus_address); + + protected: + Stats(int sleep_time = 5); + ~Stats(); + + private: + std::string monitor_port_ = "default"; + std::string name_; + std::atomic<int> num_call_, run_call_; + std::atomic<uint64_t> last_time_, run_time_, run_call_time_; + std::thread thread_; + std::atomic<bool> begin_; + std::atomic<bool> stop_; + std::condition_variable cv_; + std::mutex mutex_; + + std::thread global_thread_; + std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, + num_commit_, pending_execute_, execute_, execute_done_; + std::atomic<uint64_t> client_call_, socket_recv_; + std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_, + send_broad_cast_msg_per_rep_; + std::atomic<uint64_t> seq_fail_; + std::atomic<uint64_t> server_call_, server_process_; + std::atomic<uint64_t> run_req_num_; + std::atomic<uint64_t> run_req_run_time_; + std::atomic<uint64_t> seq_gap_; + std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_; + int monitor_sleep_time_ = 5; // default 5s. + + std::thread crow_thread_; + bool enable_resview; + bool enable_faulty_switch_; + + // Old single transaction_summary_ for backwards compatibility + // TODO: Remove after full migration to sharded map + VisualData transaction_summary_; + + // Sharded map solution for per-transaction telemetry + static const int kTelemetryShards = 256; + std::unordered_map<uint64_t, TransactionTelemetry> + transaction_telemetry_map_[kTelemetryShards]; + std::mutex telemetry_mutex_[kTelemetryShards]; + TransactionTelemetry + static_telemetry_info_; // Static info to copy to new transactions + mutable std::unordered_set<uint64_t> + finalized_sequences_; // Sequences that have been sent (don't create new + // entries) + mutable std::mutex finalized_mutex_; // Protect finalized_sequences_ + + // Helper to get shard index + size_t GetShardIndex(uint64_t seq) const; + TransactionTelemetry& GetOrCreateTelemetry(uint64_t seq); + bool IsFinalized(uint64_t seq) const; + void MarkFinalized(uint64_t seq); + + std::atomic<bool> make_faulty_; + std::atomic<uint64_t> prev_num_prepare_; + std::atomic<uint64_t> prev_num_commit_; + nlohmann::json summary_json_; + nlohmann::json consensus_history_; + std::mutex consensus_history_mutex_; // Protect consensus_history_ access + + std::unique_ptr<PrometheusHandler> prometheus_; +}; + +} // namespace resdb \ No newline at end of file
