This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/development by this push:
     new 6d4cb97d per sequence telemetry tracking
6d4cb97d is described below

commit 6d4cb97dc209abf32b87f58ef2cf446483053c8e
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 06:34:27 2026 +0000

    per sequence telemetry tracking
---
 .../consensus/execution/transaction_executor.cpp   |    2 +-
 platform/consensus/ordering/pbft/commitment.cpp    |   14 +-
 platform/statistic/stats.cpp                       | 1444 ++++++++++++--------
 platform/statistic/stats.h                         |  400 ++++--
 4 files changed, 1118 insertions(+), 742 deletions(-)

diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index bb7cf6be..3e30d3f6 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -205,7 +205,7 @@ void TransactionExecutor::OrderMessage() {
 }
 
 void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
-  global_stats_->IncCommit();
+  global_stats_->IncCommit(message->seq());
   message->set_commit_time(GetCurrentTime());
   execute_queue_.Push(std::move(message));
 }
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 5a970dd6..7ebb769f 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -135,7 +135,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
     return -2;
   }
 
-  global_stats_->RecordStateTime("request");
+  global_stats_->RecordStateTime(*seq, "request");
 
   user_request->set_type(Request::TYPE_PRE_PREPARE);
   user_request->set_current_view(message_manager_->GetCurrentView());
@@ -229,7 +229,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
   }
 
   global_stats_->IncPropose();
-  global_stats_->RecordStateTime("pre-prepare");
+  global_stats_->RecordStateTime(request->seq(), "pre-prepare");
   std::unique_ptr<Request> prepare_request = resdb::NewRequest(
       Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
   prepare_request->clear_data();
@@ -263,9 +263,9 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
     }
     return ret;
   }
-  global_stats_->IncPrepare();
   uint64_t seq = request->seq();
   int sender_id = request->sender_id();
+  global_stats_->IncPrepare(seq);
   std::unique_ptr<Request> commit_request = resdb::NewRequest(
       Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
   commit_request->mutable_data_signature()->Clear();
@@ -289,7 +289,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
       // LOG(ERROR) << "sign hash"
       //           << commit_request->data_signature().DebugString();
     }
-    global_stats_->RecordStateTime("prepare");
+    global_stats_->RecordStateTime(seq, "prepare");
     replica_communicator_->BroadCast(*commit_request);
   }
   return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -309,7 +309,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
   }
-  global_stats_->IncCommit();
+  global_stats_->IncCommit(seq);
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), message manager will
   // commit the request.
@@ -318,7 +318,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
   if (ret == CollectorResultCode::STATE_CHANGED) {
     // LOG(ERROR)<<request->data().size();
     // global_stats_->GetTransactionDetails(request->data());
-    global_stats_->RecordStateTime("commit");
+    global_stats_->RecordStateTime(seq, "commit");
   }
   return ret == CollectorResultCode::INVALID ? -2 : 0;
 }
@@ -331,7 +331,7 @@ int Commitment::PostProcessExecutedMsg() {
     if (batch_resp == nullptr) {
       continue;
     }
-    global_stats_->SendSummary();
+    global_stats_->SendSummary(batch_resp->seq());
     Request request;
     request.set_hash(batch_resp->hash());
     request.set_seq(batch_resp->seq());
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 1c511b3b..8957e733 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -17,589 +17,861 @@
  * under the License.
  */
 
- #include "platform/statistic/stats.h"
-
- #include <glog/logging.h>
- 
- #include <ctime>
- 
- #include "common/utils/utils.h"
- #include "proto/kv/kv.pb.h"
- 
- namespace asio = boost::asio;
- namespace beast = boost::beast;
- using tcp = asio::ip::tcp;
- 
- namespace resdb {
- 
- std::mutex g_mutex;
- Stats* Stats::GetGlobalStats(int seconds) {
-   std::unique_lock<std::mutex> lk(g_mutex);
-   static Stats stats(seconds);
-   return &stats;
- }  // gets a singelton instance of Stats Class
- 
- Stats::Stats(int sleep_time) {
-   monitor_sleep_time_ = sleep_time;
- #ifdef TEST_MODE
-   monitor_sleep_time_ = 1;
- #endif
-   num_call_ = 0;
-   num_commit_ = 0;
-   run_time_ = 0;
-   run_call_ = 0;
-   run_call_time_ = 0;
-   server_call_ = 0;
-   server_process_ = 0;
-   run_req_num_ = 0;
-   run_req_run_time_ = 0;
-   seq_gap_ = 0;
-   total_request_ = 0;
-   total_geo_request_ = 0;
-   geo_request_ = 0;
- 
-   stop_ = false;
-   begin_ = false;
- 
-   socket_recv_ = 0;
-   broad_cast_msg_ = 0;
-   send_broad_cast_msg_ = 0;
- 
-   prometheus_ = nullptr;
-   global_thread_ =
-       std::thread(&Stats::MonitorGlobal, this);  // pass by reference
- 
-   transaction_summary_.port = -1;
- 
-   // Setup websocket here
-   make_faulty_.store(false);
-   transaction_summary_.request_pre_prepare_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.prepare_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.commit_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.execution_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.txn_number = 0;
- }
- 
- void Stats::Stop() { stop_ = true; }
- 
- Stats::~Stats() {
-   stop_ = true;
-   if (global_thread_.joinable()) {
-     global_thread_.join();
-   }
-   if (enable_resview && crow_thread_.joinable()) {
-     crow_thread_.join();
-   }
- }
- 
- int64_t GetRSS() {
-   int64_t rss = 0;
-   FILE* fp = NULL;
-   if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
-     return 0;
-   }
- 
-   uint64_t size, resident, share, text, lib, data, dt;
-   if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, 
&text,
-              &lib, &data, &dt) != 7) {
-     fclose(fp);
-     return 0;
-   }
-   fclose(fp);
- 
-   int64_t page_size = sysconf(_SC_PAGESIZE);
-   rss = resident * page_size;
- 
-   // Convert to MB
-   rss = rss / (1024 * 1024);
- 
-   return rss;
- }
- 
- void Stats::CrowRoute() {
-   crow::SimpleApp app;
-   while (!stop_) {
-     try {
-       CROW_ROUTE(app, "/consensus_data")
-           .methods("GET"_method)([this](const crow::request& req,
-                                         crow::response& res) {
-             LOG(ERROR) << "API 1";
-             res.set_header("Access-Control-Allow-Origin",
-                            "*");  // Allow requests from any origin
-             res.set_header("Access-Control-Allow-Methods",
-                            "GET, POST, OPTIONS");  // Specify allowed methods
-             res.set_header(
-                 "Access-Control-Allow-Headers",
-                 "Content-Type, Authorization");  // Specify allowed headers
- 
-             // Send your response
-             res.body = consensus_history_.dump();
-             res.end();
-           });
-       CROW_ROUTE(app, "/get_status")
-           .methods("GET"_method)([this](const crow::request& req,
-                                         crow::response& res) {
-             LOG(ERROR) << "API 2";
-             res.set_header("Access-Control-Allow-Origin",
-                            "*");  // Allow requests from any origin
-             res.set_header("Access-Control-Allow-Methods",
-                            "GET, POST, OPTIONS");  // Specify allowed methods
-             res.set_header(
-                 "Access-Control-Allow-Headers",
-                 "Content-Type, Authorization");  // Specify allowed headers
- 
-             // Send your response
-             res.body = IsFaulty() ? "Faulty" : "Not Faulty";
-             res.end();
-           });
-       CROW_ROUTE(app, "/make_faulty")
-           .methods("GET"_method)([this](const crow::request& req,
-                                         crow::response& res) {
-             LOG(ERROR) << "API 3";
-             res.set_header("Access-Control-Allow-Origin",
-                            "*");  // Allow requests from any origin
-             res.set_header("Access-Control-Allow-Methods",
-                            "GET, POST, OPTIONS");  // Specify allowed methods
-             res.set_header(
-                 "Access-Control-Allow-Headers",
-                 "Content-Type, Authorization");  // Specify allowed headers
- 
-             // Send your response
-             if (enable_faulty_switch_) {
-               make_faulty_.store(!make_faulty_.load());
-             }
-             res.body = "Success";
-             res.end();
-           });
-       CROW_ROUTE(app, "/transaction_data")
-           .methods("GET"_method)([this](const crow::request& req,
-                                         crow::response& res) {
-             LOG(ERROR) << "API 4";
-             res.set_header("Access-Control-Allow-Origin",
-                            "*");  // Allow requests from any origin
-             res.set_header("Access-Control-Allow-Methods",
-                            "GET, POST, OPTIONS");  // Specify allowed methods
-             res.set_header(
-                 "Access-Control-Allow-Headers",
-                 "Content-Type, Authorization");  // Specify allowed headers
- 
-             nlohmann::json mem_view_json;
-             int status =
-                 getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
-             if (status == 0) {
-               mem_view_json["resident_set_size"] = GetRSS();
-               mem_view_json["max_resident_set_size"] =
-                   transaction_summary_.process_stats_.ru_maxrss;
-               mem_view_json["num_reads"] =
-                   transaction_summary_.process_stats_.ru_inblock;
-               mem_view_json["num_writes"] =
-                   transaction_summary_.process_stats_.ru_oublock;
-             }
- 
-             mem_view_json["ext_cache_hit_ratio"] =
-                 transaction_summary_.ext_cache_hit_ratio_;
-             mem_view_json["level_db_stats"] =
-                 transaction_summary_.level_db_stats_;
-             mem_view_json["level_db_approx_mem_size"] =
-                 transaction_summary_.level_db_approx_mem_size_;
-             res.body = mem_view_json.dump();
-             mem_view_json.clear();
-             res.end();
-           });
-       app.port(8500 + transaction_summary_.port).multithreaded().run();
-       sleep(1);
-     } catch (const std::exception& e) {
-     }
-   }
-   app.stop();
- }
- 
- bool Stats::IsFaulty() { return make_faulty_.load(); }
- 
- void Stats::ChangePrimary(int primary_id) {
-   transaction_summary_.primary_id = primary_id;
-   make_faulty_.store(false);
- }
- 
- void Stats::SetProps(int replica_id, std::string ip, int port,
-                      bool resview_flag, bool faulty_flag) {
-   transaction_summary_.replica_id = replica_id;
-   transaction_summary_.ip = ip;
-   transaction_summary_.port = port;
-   enable_resview = resview_flag;
-   enable_faulty_switch_ = faulty_flag;
-   if (resview_flag) {
-     crow_thread_ = std::thread(&Stats::CrowRoute, this);
-   }
- }
- 
- void Stats::SetPrimaryId(int primary_id) {
-   transaction_summary_.primary_id = primary_id;
- }
- 
- void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
-                                     std::string level_db_stats,
-                                     std::string level_db_approx_mem_size) {
-   transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
-   transaction_summary_.level_db_stats_ = level_db_stats;
-   transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
- }
- 
- void Stats::RecordStateTime(std::string state) {
-   if (!enable_resview) {
-     return;
-   }
-   if (state == "request" || state == "pre-prepare") {
-     transaction_summary_.request_pre_prepare_state_time =
-         std::chrono::system_clock::now();
-   } else if (state == "prepare") {
-     transaction_summary_.prepare_state_time = 
std::chrono::system_clock::now();
-   } else if (state == "commit") {
-     transaction_summary_.commit_state_time = std::chrono::system_clock::now();
-   }
- }
- 
- void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
-   if (!enable_resview) {
-     return;
-   }
-   transaction_summary_.txn_number = batch_request.seq();
-   transaction_summary_.txn_command.clear();
-   transaction_summary_.txn_key.clear();
-   transaction_summary_.txn_value.clear();
-   for (auto& sub_request : batch_request.user_requests()) {
-     KVRequest kv_request;
-     if (!kv_request.ParseFromString(sub_request.request().data())) {
-       break;
-     }
-     if (kv_request.cmd() == KVRequest::SET) {
-       transaction_summary_.txn_command.push_back("SET");
-       transaction_summary_.txn_key.push_back(kv_request.key());
-       transaction_summary_.txn_value.push_back(kv_request.value());
-     } else if (kv_request.cmd() == KVRequest::GET) {
-       transaction_summary_.txn_command.push_back("GET");
-       transaction_summary_.txn_key.push_back(kv_request.key());
-       transaction_summary_.txn_value.push_back("");
-     } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
-       transaction_summary_.txn_command.push_back("GETALLVALUES");
-       transaction_summary_.txn_key.push_back(kv_request.key());
-       transaction_summary_.txn_value.push_back("");
-     } else if (kv_request.cmd() == KVRequest::GETRANGE) {
-       transaction_summary_.txn_command.push_back("GETRANGE");
-       transaction_summary_.txn_key.push_back(kv_request.key());
-       transaction_summary_.txn_value.push_back(kv_request.value());
-     }
-   }
- }
- 
- void Stats::SendSummary() {
-   if (!enable_resview) {
-     return;
-   }
-   transaction_summary_.execution_time = std::chrono::system_clock::now();
- 
-   // Convert Transaction Summary to JSON
-   summary_json_["replica_id"] = transaction_summary_.replica_id;
-   summary_json_["ip"] = transaction_summary_.ip;
-   summary_json_["port"] = transaction_summary_.port;
-   summary_json_["primary_id"] = transaction_summary_.primary_id;
-   summary_json_["propose_pre_prepare_time"] =
-       transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
-           .count();
-   summary_json_["prepare_time"] =
-       transaction_summary_.prepare_state_time.time_since_epoch().count();
-   summary_json_["commit_time"] =
-       transaction_summary_.commit_state_time.time_since_epoch().count();
-   summary_json_["execution_time"] =
-       transaction_summary_.execution_time.time_since_epoch().count();
-   for (size_t i = 0;
-        i < transaction_summary_.prepare_message_count_times_list.size(); i++) 
{
-     summary_json_["prepare_message_timestamps"].push_back(
-         transaction_summary_.prepare_message_count_times_list[i]
-             .time_since_epoch()
-             .count());
-   }
-   for (size_t i = 0;
-        i < transaction_summary_.commit_message_count_times_list.size(); i++) {
-     summary_json_["commit_message_timestamps"].push_back(
-         transaction_summary_.commit_message_count_times_list[i]
-             .time_since_epoch()
-             .count());
-   }
-   summary_json_["txn_number"] = transaction_summary_.txn_number;
-   for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
-     summary_json_["txn_commands"].push_back(
-         transaction_summary_.txn_command[i]);
-   }
-   for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
-     summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
-   }
-   for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
-     summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
-   }
- 
-   summary_json_["ext_cache_hit_ratio"] =
-       transaction_summary_.ext_cache_hit_ratio_;
-   consensus_history_[std::to_string(transaction_summary_.txn_number)] =
-       summary_json_;
- 
-   LOG(ERROR) << summary_json_.dump();
- 
-   // Reset Transaction Summary Parameters
-   transaction_summary_.request_pre_prepare_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.prepare_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.commit_state_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.execution_time =
-       std::chrono::system_clock::time_point::min();
-   transaction_summary_.prepare_message_count_times_list.clear();
-   transaction_summary_.commit_message_count_times_list.clear();
- 
-   summary_json_.clear();
- }
- 
- void Stats::MonitorGlobal() {
-   LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
- 
-   uint64_t seq_fail = 0;
-   uint64_t client_call = 0, socket_recv = 0;
-   uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 
0,
-            pending_execute = 0, execute = 0, execute_done = 0;
-   uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
-   uint64_t send_broad_cast_msg_per_rep = 0;
-   uint64_t server_call = 0, server_process = 0;
-   uint64_t seq_gap = 0;
-   uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
- 
-   // ====== for client proxy ======
-   uint64_t run_req_num = 0, run_req_run_time = 0;
- 
-   uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
-   // =============================
- 
-   uint64_t last_seq_fail = 0;
-   uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 
0,
-            last_num_commit = 0;
-   uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
-   uint64_t last_client_call = 0, last_socket_recv = 0;
-   uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
-   uint64_t last_send_broad_cast_msg_per_rep = 0;
-   uint64_t last_server_call = 0, last_server_process = 0;
-   uint64_t last_total_request = 0, last_total_geo_request = 0,
-            last_geo_request = 0;
-   uint64_t time = 0;
- 
-   while (!stop_) {
-     sleep(monitor_sleep_time_);
-     time += monitor_sleep_time_;
-     seq_fail = seq_fail_;
-     socket_recv = socket_recv_;
-     client_call = client_call_;
-     num_client_req = num_client_req_;
-     num_propose = num_propose_;
-     num_prepare = num_prepare_;
-     num_commit = num_commit_;
-     pending_execute = pending_execute_;
-     execute = execute_;
-     execute_done = execute_done_;
-     broad_cast_msg = broad_cast_msg_;
-     send_broad_cast_msg = send_broad_cast_msg_;
-     send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
-     server_call = server_call_;
-     server_process = server_process_;
-     seq_gap = seq_gap_;
-     total_request = total_request_;
-     total_geo_request = total_geo_request_;
-     geo_request = geo_request_;
- 
-     run_req_num = run_req_num_;
-     run_req_run_time = run_req_run_time_;
- 
-     LOG(ERROR) << "=========== monitor =========\n"
-                << "server call:" << server_call - last_server_call
-                << " server process:" << server_process - last_server_process
-                << " socket recv:" << socket_recv - last_socket_recv
-                << " "
-                   "client call:"
-                << client_call - last_client_call
-                << " "
-                   "client req:"
-                << num_client_req - last_num_client_req
-                << " "
-                   "broad_cast:"
-                << broad_cast_msg - last_broad_cast_msg
-                << " "
-                   "send broad_cast:"
-                << send_broad_cast_msg - last_send_broad_cast_msg
-                << " "
-                   "per send broad_cast:"
-                << send_broad_cast_msg_per_rep - 
last_send_broad_cast_msg_per_rep
-                << " "
-                   "propose:"
-                << num_propose - last_num_propose
-                << " "
-                   "prepare:"
-                << (num_prepare - last_num_prepare)
-                << " "
-                   "commit:"
-                << (num_commit - last_num_commit)
-                << " "
-                   "pending execute:"
-                << pending_execute - last_pending_execute
-                << " "
-                   "execute:"
-                << execute - last_execute
-                << " "
-                   "execute done:"
-                << execute_done - last_execute_done << " seq gap:" << seq_gap
-                << " total request:" << total_request - last_total_request
-                << " txn:" << (total_request - last_total_request) / 5
-                << " total geo request:"
-                << total_geo_request - last_total_geo_request
-                << " total geo request per:"
-                << (total_geo_request - last_total_geo_request) / 5
-                << " geo request:" << (geo_request - last_geo_request)
-                << " "
-                   "seq fail:"
-                << seq_fail - last_seq_fail << " time:" << time
-                << " "
-                   "\n--------------- monitor ------------";
-     if (run_req_num - last_run_req_num > 0) {
-       LOG(ERROR) << "  req client latency:"
-                  << static_cast<double>(run_req_run_time -
-                                         last_run_req_run_time) /
-                         (run_req_num - last_run_req_num) / 1000000000.0;
-     }
- 
-     last_seq_fail = seq_fail;
-     last_socket_recv = socket_recv;
-     last_client_call = client_call;
-     last_num_client_req = num_client_req;
-     last_num_propose = num_propose;
-     last_num_prepare = num_prepare;
-     last_num_commit = num_commit;
-     last_pending_execute = pending_execute;
-     last_execute = execute;
-     last_execute_done = execute_done;
- 
-     last_broad_cast_msg = broad_cast_msg;
-     last_send_broad_cast_msg = send_broad_cast_msg;
-     last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
- 
-     last_server_call = server_call;
-     last_server_process = server_process;
- 
-     last_run_req_num = run_req_num;
-     last_run_req_run_time = run_req_run_time;
-     last_total_request = total_request;
-     last_total_geo_request = total_geo_request;
-     last_geo_request = geo_request;
-   }
- }
- 
- void Stats::IncClientCall() {
-   if (prometheus_) {
-     prometheus_->Inc(CLIENT_CALL, 1);
-   }
-   client_call_++;
- }
- 
- void Stats::IncClientRequest() {
-   if (prometheus_) {
-     prometheus_->Inc(CLIENT_REQ, 1);
-   }
-   num_client_req_++;
- }
- 
- void Stats::IncPropose() {
-   if (prometheus_) {
-     prometheus_->Inc(PROPOSE, 1);
-   }
-   num_propose_++;
- }
- 
- void Stats::IncPrepare() {
-   if (prometheus_) {
-     prometheus_->Inc(PREPARE, 1);
-   }
-   num_prepare_++;
-   transaction_summary_.prepare_message_count_times_list.push_back(
-       std::chrono::system_clock::now());
- }
- 
- void Stats::IncCommit() {
-   if (prometheus_) {
-     prometheus_->Inc(COMMIT, 1);
-   }
-   num_commit_++;
-   transaction_summary_.commit_message_count_times_list.push_back(
-       std::chrono::system_clock::now());
- }
- 
- void Stats::IncPendingExecute() { pending_execute_++; }
- 
- void Stats::IncExecute() { execute_++; }
- 
- void Stats::IncExecuteDone() {
-   if (prometheus_) {
-     prometheus_->Inc(EXECUTE, 1);
-   }
-   execute_done_++;
- }
- 
- void Stats::BroadCastMsg() {
-   if (prometheus_) {
-     prometheus_->Inc(BROAD_CAST, 1);
-   }
-   broad_cast_msg_++;
- }
- 
- void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
- 
- void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
- 
- void Stats::SeqFail() { seq_fail_++; }
- 
- void Stats::IncTotalRequest(uint32_t num) {
-   if (prometheus_) {
-     prometheus_->Inc(NUM_EXECUTE_TX, num);
-   }
-   total_request_ += num;
- }
- 
- void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
- 
- void Stats::IncGeoRequest() { geo_request_++; }
- 
- void Stats::ServerCall() {
-   if (prometheus_) {
-     prometheus_->Inc(SERVER_CALL_NAME, 1);
-   }
-   server_call_++;
- }
- 
- void Stats::ServerProcess() {
-   if (prometheus_) {
-     prometheus_->Inc(SERVER_PROCESS, 1);
-   }
-   server_process_++;
- }
- 
- void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
- 
- void Stats::AddLatency(uint64_t run_time) {
-   run_req_num_++;
-   run_req_run_time_ += run_time;
- }
- 
- void Stats::SetPrometheus(const std::string& prometheus_address) {
-   prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
- }
- 
- }  // namespace resdb
\ No newline at end of file
+#include "platform/statistic/stats.h"
+
+#include <glog/logging.h>
+
+#include <ctime>
+
+#include "common/utils/utils.h"
+#include "proto/kv/kv.pb.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
+
+namespace resdb {
+
+std::mutex g_mutex;
+Stats* Stats::GetGlobalStats(int seconds) {
+  std::unique_lock<std::mutex> lk(g_mutex);
+  static Stats stats(seconds);
+  return &stats;
+}  // gets a singelton instance of Stats Class
+
+Stats::Stats(int sleep_time) {
+  monitor_sleep_time_ = sleep_time;
+#ifdef TEST_MODE
+  monitor_sleep_time_ = 1;
+#endif
+  num_call_ = 0;
+  num_commit_ = 0;
+  run_time_ = 0;
+  run_call_ = 0;
+  run_call_time_ = 0;
+  server_call_ = 0;
+  server_process_ = 0;
+  run_req_num_ = 0;
+  run_req_run_time_ = 0;
+  seq_gap_ = 0;
+  total_request_ = 0;
+  total_geo_request_ = 0;
+  geo_request_ = 0;
+
+  stop_ = false;
+  begin_ = false;
+
+  socket_recv_ = 0;
+  broad_cast_msg_ = 0;
+  send_broad_cast_msg_ = 0;
+
+  prometheus_ = nullptr;
+  global_thread_ =
+      std::thread(&Stats::MonitorGlobal, this);  // pass by reference
+
+  transaction_summary_.port = -1;
+
+  // Setup websocket here
+  make_faulty_.store(false);
+  transaction_summary_.request_pre_prepare_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.prepare_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.commit_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.execution_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.txn_number = 0;
+
+  // Initialize static telemetry info
+  static_telemetry_info_.port = -1;
+}
+
+void Stats::Stop() { stop_ = true; }
+
+Stats::~Stats() {
+  stop_ = true;
+  if (global_thread_.joinable()) {
+    global_thread_.join();
+  }
+  if (enable_resview && crow_thread_.joinable()) {
+    crow_thread_.join();
+  }
+}
+
+int64_t GetRSS() {
+  int64_t rss = 0;
+  FILE* fp = NULL;
+  if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
+    return 0;
+  }
+
+  uint64_t size, resident, share, text, lib, data, dt;
+  if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, 
&text,
+             &lib, &data, &dt) != 7) {
+    fclose(fp);
+    return 0;
+  }
+  fclose(fp);
+
+  int64_t page_size = sysconf(_SC_PAGESIZE);
+  rss = resident * page_size;
+
+  // Convert to MB
+  rss = rss / (1024 * 1024);
+
+  return rss;
+}
+
+void Stats::CrowRoute() {
+  crow::SimpleApp app;
+  while (!stop_) {
+    try {
+      CROW_ROUTE(app, "/consensus_data")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res) {
+            LOG(ERROR) << "API 1";
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            // Send your response
+            {
+              std::lock_guard<std::mutex> lock(consensus_history_mutex_);
+              res.body = consensus_history_.dump();
+            }
+            res.end();
+          });
+      CROW_ROUTE(app, "/get_status")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res) {
+            LOG(ERROR) << "API 2";
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            // Send your response
+            res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+            res.end();
+          });
+      CROW_ROUTE(app, "/make_faulty")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res) {
+            LOG(ERROR) << "API 3";
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            // Send your response
+            if (enable_faulty_switch_) {
+              make_faulty_.store(!make_faulty_.load());
+            }
+            res.body = "Success";
+            res.end();
+          });
+      CROW_ROUTE(app, "/transaction_data")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res) {
+            LOG(ERROR) << "API 4";
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            nlohmann::json mem_view_json;
+            int status =
+                getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
+            if (status == 0) {
+              mem_view_json["resident_set_size"] = GetRSS();
+              mem_view_json["max_resident_set_size"] =
+                  transaction_summary_.process_stats_.ru_maxrss;
+              mem_view_json["num_reads"] =
+                  transaction_summary_.process_stats_.ru_inblock;
+              mem_view_json["num_writes"] =
+                  transaction_summary_.process_stats_.ru_oublock;
+            }
+
+            mem_view_json["ext_cache_hit_ratio"] =
+                transaction_summary_.ext_cache_hit_ratio_;
+            mem_view_json["level_db_stats"] =
+                transaction_summary_.level_db_stats_;
+            mem_view_json["level_db_approx_mem_size"] =
+                transaction_summary_.level_db_approx_mem_size_;
+            res.body = mem_view_json.dump();
+            mem_view_json.clear();
+            res.end();
+          });
+      app.port(8500 + transaction_summary_.port).multithreaded().run();
+      sleep(1);
+    } catch (const std::exception& e) {
+    }
+  }
+  app.stop();
+}
+
+bool Stats::IsFaulty() { return make_faulty_.load(); }
+
+void Stats::ChangePrimary(int primary_id) {
+  transaction_summary_.primary_id = primary_id;
+  make_faulty_.store(false);
+}
+
+void Stats::SetProps(int replica_id, std::string ip, int port,
+                     bool resview_flag, bool faulty_flag) {
+  transaction_summary_.replica_id = replica_id;
+  transaction_summary_.ip = ip;
+  transaction_summary_.port = port;
+  // Also set static telemetry info for new transactions
+  static_telemetry_info_.replica_id = replica_id;
+  static_telemetry_info_.ip = ip;
+  static_telemetry_info_.port = port;
+  enable_resview = resview_flag;
+  enable_faulty_switch_ = faulty_flag;
+  if (resview_flag) {
+    crow_thread_ = std::thread(&Stats::CrowRoute, this);
+  }
+}
+
+void Stats::SetPrimaryId(int primary_id) {
+  transaction_summary_.primary_id = primary_id;
+  static_telemetry_info_.primary_id = primary_id;
+}
+
+void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
+                                    std::string level_db_stats,
+                                    std::string level_db_approx_mem_size) {
+  transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+  transaction_summary_.level_db_stats_ = level_db_stats;
+  transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+  static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+  static_telemetry_info_.level_db_stats_ = level_db_stats;
+  static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+}
+
+size_t Stats::GetShardIndex(uint64_t seq) const {
+  return seq % kTelemetryShards;
+}
+
+TransactionTelemetry& Stats::GetOrCreateTelemetry(uint64_t seq) {
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    // Initialize with static info - only copy static fields, preserve
+    // timestamps as min
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ =
+        static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ =
+        static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    // Timestamps remain at min() which is correct
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+  return it->second;
+}
+
+void Stats::RecordStateTime(std::string state) {
+  // Old method without seq - deprecated, but keep for backwards compatibility
+  // This uses the old single transaction_summary_ approach
+  if (!enable_resview) {
+    return;
+  }
+  // Temporary fix: protect access to transaction_summary_
+  std::lock_guard<std::mutex> lock(
+      telemetry_mutex_[0]);  // Use first shard for old code
+  if (state == "request" || state == "pre-prepare") {
+    transaction_summary_.request_pre_prepare_state_time =
+        std::chrono::system_clock::now();
+  } else if (state == "prepare") {
+    transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
+  } else if (state == "commit") {
+    transaction_summary_.commit_state_time = std::chrono::system_clock::now();
+  }
+}
+
+void Stats::RecordStateTime(uint64_t seq, std::string state) {
+  if (!enable_resview) {
+    return;
+  }
+
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    // Initialize with static info - DO NOT reset existing entry
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ =
+        static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ =
+        static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    // Timestamps start at min() - correct
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+
+  // Entry exists - update timestamp only, don't touch other fields
+  auto& telemetry = it->second;
+  auto now = std::chrono::system_clock::now();
+
+  if (state == "request" || state == "pre-prepare") {
+    telemetry.request_pre_prepare_state_time = now;
+  } else if (state == "prepare") {
+    telemetry.prepare_state_time = now;
+  } else if (state == "commit") {
+    telemetry.commit_state_time = now;
+  }
+}
+
+void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+  if (!enable_resview) {
+    return;
+  }
+
+  uint64_t seq = batch_request.seq();
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  bool entry_existed = (it != map.end());
+
+  if (!entry_existed) {
+    // Initialize with static info - timestamps start at min() which is correct
+    // They will be set by RecordStateTime() calls later
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ =
+        static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ =
+        static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    // Timestamps remain at min() - will be updated by RecordStateTime() if
+    // needed
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+
+  // Entry exists (either just created or already existed)
+  // CRITICAL: Only update transaction details fields, NEVER touch timestamp
+  // fields
+  auto& telemetry = it->second;
+  telemetry.txn_number = seq;
+
+  // Only update transaction detail fields - timestamps are NEVER modified here
+  telemetry.txn_command.clear();
+  telemetry.txn_key.clear();
+  telemetry.txn_value.clear();
+
+  for (auto& sub_request : batch_request.user_requests()) {
+    KVRequest kv_request;
+    if (!kv_request.ParseFromString(sub_request.request().data())) {
+      break;
+    }
+
+    if (kv_request.cmd() == KVRequest::SET) {
+      telemetry.txn_command.push_back("SET");
+      telemetry.txn_key.push_back(kv_request.key());
+      telemetry.txn_value.push_back(kv_request.value());
+    } else if (kv_request.cmd() == KVRequest::GET) {
+      telemetry.txn_command.push_back("GET");
+      telemetry.txn_key.push_back(kv_request.key());
+      telemetry.txn_value.push_back("");
+    } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+      telemetry.txn_command.push_back("GETALLVALUES");
+      telemetry.txn_key.push_back(kv_request.key());
+      telemetry.txn_value.push_back("");
+    } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+      telemetry.txn_command.push_back("GETRANGE");
+      telemetry.txn_key.push_back(kv_request.key());
+      telemetry.txn_value.push_back(kv_request.value());
+    }
+  }
+
+  // Timestamps (request_pre_prepare_state_time, prepare_state_time,
+  // commit_state_time, execution_time, prepare_message_count_times_list,
+  // commit_message_count_times_list) are NEVER modified in this function - 
they
+  // are only set by RecordStateTime(), IncPrepare(), IncCommit(), and
+  // SendSummary()
+}
+
+void Stats::SendSummary() {
+  // Old method without seq - deprecated, but keep for backwards compatibility
+  // This uses the old single transaction_summary_ approach
+  if (!enable_resview) {
+    return;
+  }
+  std::lock_guard<std::mutex> lock(
+      telemetry_mutex_[0]);  // Use first shard for old code
+
+  transaction_summary_.execution_time = std::chrono::system_clock::now();
+
+  // Convert Transaction Summary to JSON (old manual way)
+  summary_json_["replica_id"] = transaction_summary_.replica_id;
+  summary_json_["ip"] = transaction_summary_.ip;
+  summary_json_["port"] = transaction_summary_.port;
+  summary_json_["primary_id"] = transaction_summary_.primary_id;
+  summary_json_["propose_pre_prepare_time"] =
+      transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+          .count();
+  summary_json_["prepare_time"] =
+      transaction_summary_.prepare_state_time.time_since_epoch().count();
+  summary_json_["commit_time"] =
+      transaction_summary_.commit_state_time.time_since_epoch().count();
+  summary_json_["execution_time"] =
+      transaction_summary_.execution_time.time_since_epoch().count();
+  for (size_t i = 0;
+       i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
+    summary_json_["prepare_message_timestamps"].push_back(
+        transaction_summary_.prepare_message_count_times_list[i]
+            .time_since_epoch()
+            .count());
+  }
+  for (size_t i = 0;
+       i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+    summary_json_["commit_message_timestamps"].push_back(
+        transaction_summary_.commit_message_count_times_list[i]
+            .time_since_epoch()
+            .count());
+  }
+  summary_json_["txn_number"] = transaction_summary_.txn_number;
+  for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+    summary_json_["txn_commands"].push_back(
+        transaction_summary_.txn_command[i]);
+  }
+  for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
+    summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+  }
+  for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
+    summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+  }
+
+  summary_json_["ext_cache_hit_ratio"] =
+      transaction_summary_.ext_cache_hit_ratio_;
+
+  {
+    std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
+    consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+        summary_json_;
+  }
+
+  LOG(ERROR) << summary_json_.dump();
+
+  // Reset Transaction Summary Parameters
+  transaction_summary_.request_pre_prepare_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.prepare_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.commit_state_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.execution_time =
+      std::chrono::system_clock::time_point::min();
+  transaction_summary_.prepare_message_count_times_list.clear();
+  transaction_summary_.commit_message_count_times_list.clear();
+
+  summary_json_.clear();
+}
+
+void Stats::SendSummary(uint64_t seq) {
+  if (!enable_resview) {
+    return;
+  }
+
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    LOG(WARNING) << "SendSummary called for unknown transaction: " << seq;
+    return;
+  }
+
+  auto& telemetry = it->second;
+
+  // CRITICAL: Set execution_time before serialization, but don't modify any
+  // other fields
+  telemetry.execution_time = std::chrono::system_clock::now();
+
+  // Use automatic JSON serialization - this reads current state, doesn't 
modify
+  // it
+  nlohmann::json summary_json = telemetry.to_json();
+
+  {
+    std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
+    // Only update if we don't already have this sequence, or update existing
+    // This prevents overwriting good data with bad data
+    std::string seq_str = std::to_string(seq);
+    if (consensus_history_.find(seq_str) == consensus_history_.end()) {
+      // First time storing - always store
+      consensus_history_[seq_str] = summary_json;
+    } else {
+      // Already exists - only update if new data has more complete information
+      // (i.e., has transaction details or more timestamps)
+      auto& existing = consensus_history_[seq_str];
+      bool new_has_details = !summary_json["txn_commands"].empty();
+      bool existing_has_details = !existing["txn_commands"].empty();
+
+      // If new data has details and existing doesn't, merge them
+      if (new_has_details && !existing_has_details) {
+        existing["txn_commands"] = summary_json["txn_commands"];
+        existing["txn_keys"] = summary_json["txn_keys"];
+        existing["txn_values"] = summary_json["txn_values"];
+      }
+
+      // Always update execution_time as it's the latest
+      existing["execution_time"] = summary_json["execution_time"];
+    }
+  }
+
+  LOG(ERROR) << summary_json.dump();
+
+  // DON'T delete the entry - per-sequence state means each seq has its own
+  // entry and it will naturally be reused or cleaned up later if needed
+  // Deleting causes issues if SendSummary() is called multiple times or
+  // if telemetry updates happen after sending
+}
+
+void Stats::MonitorGlobal() {
+  LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
+
+  uint64_t seq_fail = 0;
+  uint64_t client_call = 0, socket_recv = 0;
+  uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 
0,
+           pending_execute = 0, execute = 0, execute_done = 0;
+  uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
+  uint64_t send_broad_cast_msg_per_rep = 0;
+  uint64_t server_call = 0, server_process = 0;
+  uint64_t seq_gap = 0;
+  uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
+
+  // ====== for client proxy ======
+  uint64_t run_req_num = 0, run_req_run_time = 0;
+
+  uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
+  // =============================
+
+  uint64_t last_seq_fail = 0;
+  uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
+           last_num_commit = 0;
+  uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
+  uint64_t last_client_call = 0, last_socket_recv = 0;
+  uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
+  uint64_t last_send_broad_cast_msg_per_rep = 0;
+  uint64_t last_server_call = 0, last_server_process = 0;
+  uint64_t last_total_request = 0, last_total_geo_request = 0,
+           last_geo_request = 0;
+  uint64_t time = 0;
+
+  while (!stop_) {
+    sleep(monitor_sleep_time_);
+    time += monitor_sleep_time_;
+    seq_fail = seq_fail_;
+    socket_recv = socket_recv_;
+    client_call = client_call_;
+    num_client_req = num_client_req_;
+    num_propose = num_propose_;
+    num_prepare = num_prepare_;
+    num_commit = num_commit_;
+    pending_execute = pending_execute_;
+    execute = execute_;
+    execute_done = execute_done_;
+    broad_cast_msg = broad_cast_msg_;
+    send_broad_cast_msg = send_broad_cast_msg_;
+    send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
+    server_call = server_call_;
+    server_process = server_process_;
+    seq_gap = seq_gap_;
+    total_request = total_request_;
+    total_geo_request = total_geo_request_;
+    geo_request = geo_request_;
+
+    run_req_num = run_req_num_;
+    run_req_run_time = run_req_run_time_;
+
+    LOG(ERROR) << "=========== monitor =========\n"
+               << "server call:" << server_call - last_server_call
+               << " server process:" << server_process - last_server_process
+               << " socket recv:" << socket_recv - last_socket_recv
+               << " "
+                  "client call:"
+               << client_call - last_client_call
+               << " "
+                  "client req:"
+               << num_client_req - last_num_client_req
+               << " "
+                  "broad_cast:"
+               << broad_cast_msg - last_broad_cast_msg
+               << " "
+                  "send broad_cast:"
+               << send_broad_cast_msg - last_send_broad_cast_msg
+               << " "
+                  "per send broad_cast:"
+               << send_broad_cast_msg_per_rep - 
last_send_broad_cast_msg_per_rep
+               << " "
+                  "propose:"
+               << num_propose - last_num_propose
+               << " "
+                  "prepare:"
+               << (num_prepare - last_num_prepare)
+               << " "
+                  "commit:"
+               << (num_commit - last_num_commit)
+               << " "
+                  "pending execute:"
+               << pending_execute - last_pending_execute
+               << " "
+                  "execute:"
+               << execute - last_execute
+               << " "
+                  "execute done:"
+               << execute_done - last_execute_done << " seq gap:" << seq_gap
+               << " total request:" << total_request - last_total_request
+               << " txn:" << (total_request - last_total_request) / 5
+               << " total geo request:"
+               << total_geo_request - last_total_geo_request
+               << " total geo request per:"
+               << (total_geo_request - last_total_geo_request) / 5
+               << " geo request:" << (geo_request - last_geo_request)
+               << " "
+                  "seq fail:"
+               << seq_fail - last_seq_fail << " time:" << time
+               << " "
+                  "\n--------------- monitor ------------";
+    if (run_req_num - last_run_req_num > 0) {
+      LOG(ERROR) << "  req client latency:"
+                 << static_cast<double>(run_req_run_time -
+                                        last_run_req_run_time) /
+                        (run_req_num - last_run_req_num) / 1000000000.0;
+    }
+
+    last_seq_fail = seq_fail;
+    last_socket_recv = socket_recv;
+    last_client_call = client_call;
+    last_num_client_req = num_client_req;
+    last_num_propose = num_propose;
+    last_num_prepare = num_prepare;
+    last_num_commit = num_commit;
+    last_pending_execute = pending_execute;
+    last_execute = execute;
+    last_execute_done = execute_done;
+
+    last_broad_cast_msg = broad_cast_msg;
+    last_send_broad_cast_msg = send_broad_cast_msg;
+    last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
+
+    last_server_call = server_call;
+    last_server_process = server_process;
+
+    last_run_req_num = run_req_num;
+    last_run_req_run_time = run_req_run_time;
+    last_total_request = total_request;
+    last_total_geo_request = total_geo_request;
+    last_geo_request = geo_request;
+  }
+}
+
+void Stats::IncClientCall() {
+  if (prometheus_) {
+    prometheus_->Inc(CLIENT_CALL, 1);
+  }
+  client_call_++;
+}
+
+void Stats::IncClientRequest() {
+  if (prometheus_) {
+    prometheus_->Inc(CLIENT_REQ, 1);
+  }
+  num_client_req_++;
+}
+
+void Stats::IncPropose() {
+  if (prometheus_) {
+    prometheus_->Inc(PROPOSE, 1);
+  }
+  num_propose_++;
+}
+
+void Stats::IncPrepare() {
+  if (prometheus_) {
+    prometheus_->Inc(PREPARE, 1);
+  }
+  num_prepare_++;
+  // Old method without seq - deprecated
+  if (enable_resview) {
+    std::lock_guard<std::mutex> lock(
+        telemetry_mutex_[0]);  // Use first shard for old code
+    transaction_summary_.prepare_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
+}
+
+void Stats::IncPrepare(uint64_t seq) {
+  if (prometheus_) {
+    prometheus_->Inc(PREPARE, 1);
+  }
+  num_prepare_++;
+  if (enable_resview) {
+    size_t shard = GetShardIndex(seq);
+    std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+    auto& map = transaction_telemetry_map_[shard];
+    auto it = map.find(seq);
+    if (it == map.end()) {
+      TransactionTelemetry telemetry;
+      telemetry.replica_id = static_telemetry_info_.replica_id;
+      telemetry.primary_id = static_telemetry_info_.primary_id;
+      telemetry.ip = static_telemetry_info_.ip;
+      telemetry.port = static_telemetry_info_.port;
+      telemetry.ext_cache_hit_ratio_ =
+          static_telemetry_info_.ext_cache_hit_ratio_;
+      telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+      telemetry.level_db_approx_mem_size_ =
+          static_telemetry_info_.level_db_approx_mem_size_;
+      telemetry.txn_number = seq;
+      it = map.emplace(seq, std::move(telemetry)).first;
+    }
+
+    it->second.prepare_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
+}
+
+void Stats::IncCommit() {
+  if (prometheus_) {
+    prometheus_->Inc(COMMIT, 1);
+  }
+  num_commit_++;
+  // Old method without seq - deprecated
+  if (enable_resview) {
+    std::lock_guard<std::mutex> lock(
+        telemetry_mutex_[0]);  // Use first shard for old code
+    transaction_summary_.commit_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
+}
+
+void Stats::IncCommit(uint64_t seq) {
+  if (prometheus_) {
+    prometheus_->Inc(COMMIT, 1);
+  }
+  num_commit_++;
+  if (enable_resview) {
+    size_t shard = GetShardIndex(seq);
+    std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+    auto& map = transaction_telemetry_map_[shard];
+    auto it = map.find(seq);
+    if (it == map.end()) {
+      TransactionTelemetry telemetry;
+      telemetry.replica_id = static_telemetry_info_.replica_id;
+      telemetry.primary_id = static_telemetry_info_.primary_id;
+      telemetry.ip = static_telemetry_info_.ip;
+      telemetry.port = static_telemetry_info_.port;
+      telemetry.ext_cache_hit_ratio_ =
+          static_telemetry_info_.ext_cache_hit_ratio_;
+      telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+      telemetry.level_db_approx_mem_size_ =
+          static_telemetry_info_.level_db_approx_mem_size_;
+      telemetry.txn_number = seq;
+      it = map.emplace(seq, std::move(telemetry)).first;
+    }
+
+    it->second.commit_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
+}
+
+void Stats::IncPendingExecute() { pending_execute_++; }
+
+void Stats::IncExecute() { execute_++; }
+
+void Stats::IncExecuteDone() {
+  if (prometheus_) {
+    prometheus_->Inc(EXECUTE, 1);
+  }
+  execute_done_++;
+}
+
+void Stats::BroadCastMsg() {
+  if (prometheus_) {
+    prometheus_->Inc(BROAD_CAST, 1);
+  }
+  broad_cast_msg_++;
+}
+
+void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+
+void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+
+void Stats::SeqFail() { seq_fail_++; }
+
+void Stats::IncTotalRequest(uint32_t num) {
+  if (prometheus_) {
+    prometheus_->Inc(NUM_EXECUTE_TX, num);
+  }
+  total_request_ += num;
+}
+
+void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+
+void Stats::IncGeoRequest() { geo_request_++; }
+
+void Stats::ServerCall() {
+  if (prometheus_) {
+    prometheus_->Inc(SERVER_CALL_NAME, 1);
+  }
+  server_call_++;
+}
+
+void Stats::ServerProcess() {
+  if (prometheus_) {
+    prometheus_->Inc(SERVER_PROCESS, 1);
+  }
+  server_process_++;
+}
+
+void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+
+void Stats::AddLatency(uint64_t run_time) {
+  run_req_num_++;
+  run_req_run_time_ += run_time;
+}
+
+void Stats::SetPrometheus(const std::string& prometheus_address) {
+  prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
+}
+
+}  // namespace resdb
\ No newline at end of file
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index c0ad30a7..4a0462bd 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -17,151 +17,255 @@
  * under the License.
  */
 
- #pragma once
-
- #include <crow.h>
- 
- #include <chrono>
- #include <future>
- #include <nlohmann/json.hpp>
- 
- #include "boost/asio.hpp"
- #include "boost/beast.hpp"
- #include "platform/common/network/tcp_socket.h"
- #include "platform/proto/resdb.pb.h"
- #include "platform/statistic/prometheus_handler.h"
- #include "proto/kv/kv.pb.h"
- #include "sys/resource.h"
- 
- namespace asio = boost::asio;
- namespace beast = boost::beast;
- using tcp = asio::ip::tcp;
- 
- namespace resdb {
- 
- struct VisualData {
-   // Set when initializing
-   int replica_id;
-   int primary_id;
-   std::string ip;
-   int port;
- 
-   // Set when new txn is received
-   int txn_number;
-   std::vector<std::string> txn_command;
-   std::vector<std::string> txn_key;
-   std::vector<std::string> txn_value;
- 
-   // Request state if primary_id==replica_id, pre_prepare state otherwise
-   std::chrono::system_clock::time_point request_pre_prepare_state_time;
-   std::chrono::system_clock::time_point prepare_state_time;
-   std::vector<std::chrono::system_clock::time_point>
-       prepare_message_count_times_list;
-   std::chrono::system_clock::time_point commit_state_time;
-   std::vector<std::chrono::system_clock::time_point>
-       commit_message_count_times_list;
-   std::chrono::system_clock::time_point execution_time;
- 
-   // Storage Engine Stats
-   double ext_cache_hit_ratio_;
-   std::string level_db_stats_;
-   std::string level_db_approx_mem_size_;
- 
-   // process stats
-   struct rusage process_stats_;
- };
- 
- class Stats {
-  public:
-   static Stats* GetGlobalStats(int sleep_seconds = 5);
- 
-   void Stop();
- 
-   void RetrieveProgress();
-   void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
-                 bool faulty_flag);
-   void SetPrimaryId(int primary_id);
-   void SetStorageEngineMetrics(double ext_cache_hit_ratio,
-                                std::string level_db_stats,
-                                std::string level_db_approx_mem_size);
-   void RecordStateTime(std::string state);
-   void GetTransactionDetails(BatchUserRequest batch_request);
-   void SendSummary();
-   void CrowRoute();
-   bool IsFaulty();
-   void ChangePrimary(int primary_id);
- 
-   void AddLatency(uint64_t run_time);
- 
-   void Monitor();
-   void MonitorGlobal();
- 
-   void IncSocketRecv();
- 
-   void IncClientCall();
- 
-   void IncClientRequest();
-   void IncPropose();
-   void IncPrepare();
-   void IncCommit();
-   void IncPendingExecute();
-   void IncExecute();
-   void IncExecuteDone();
- 
-   void BroadCastMsg();
-   void SendBroadCastMsg(uint32_t num);
-   void SendBroadCastMsgPerRep();
-   void SeqFail();
-   void IncTotalRequest(uint32_t num);
-   void IncTotalGeoRequest(uint32_t num);
-   void IncGeoRequest();
- 
-   void SeqGap(uint64_t seq_gap);
-   // Network in->worker
-   void ServerCall();
-   void ServerProcess();
-   void SetPrometheus(const std::string& prometheus_address);
- 
-  protected:
-   Stats(int sleep_time = 5);
-   ~Stats();
- 
-  private:
-   std::string monitor_port_ = "default";
-   std::string name_;
-   std::atomic<int> num_call_, run_call_;
-   std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
-   std::thread thread_;
-   std::atomic<bool> begin_;
-   std::atomic<bool> stop_;
-   std::condition_variable cv_;
-   std::mutex mutex_;
- 
-   std::thread global_thread_;
-   std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
-       num_commit_, pending_execute_, execute_, execute_done_;
-   std::atomic<uint64_t> client_call_, socket_recv_;
-   std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
-       send_broad_cast_msg_per_rep_;
-   std::atomic<uint64_t> seq_fail_;
-   std::atomic<uint64_t> server_call_, server_process_;
-   std::atomic<uint64_t> run_req_num_;
-   std::atomic<uint64_t> run_req_run_time_;
-   std::atomic<uint64_t> seq_gap_;
-   std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
-   int monitor_sleep_time_ = 5;  // default 5s.
- 
-   std::thread crow_thread_;
-   bool enable_resview;
-   bool enable_faulty_switch_;
-   VisualData transaction_summary_;
-   std::atomic<bool> make_faulty_;
-   std::atomic<uint64_t> prev_num_prepare_;
-   std::atomic<uint64_t> prev_num_commit_;
-   nlohmann::json summary_json_;
-   nlohmann::json consensus_history_;
- 
-   std::unique_ptr<PrometheusHandler> prometheus_;
- };
- 
- }  // namespace resdb
\ No newline at end of file
+#pragma once
+
+#include <crow.h>
+
+#include <chrono>
+#include <future>
+#include <mutex>
+#include <nlohmann/json.hpp>
+#include <unordered_map>
+
+#include "boost/asio.hpp"
+#include "boost/beast.hpp"
+#include "platform/common/network/tcp_socket.h"
+#include "platform/proto/resdb.pb.h"
+#include "platform/statistic/prometheus_handler.h"
+#include "proto/kv/kv.pb.h"
+#include "sys/resource.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
+
+namespace resdb {
+
+// Per-transaction telemetry data with automatic JSON serialization
+struct TransactionTelemetry {
+  // Static info (set once, copied to each transaction)
+  int replica_id = 0;
+  int primary_id = 0;
+  std::string ip;
+  int port = -1;
+
+  // Transaction-specific data
+  uint64_t txn_number = 0;
+  std::vector<std::string> txn_command;
+  std::vector<std::string> txn_key;
+  std::vector<std::string> txn_value;
+
+  // Timestamps
+  std::chrono::system_clock::time_point request_pre_prepare_state_time;
+  std::chrono::system_clock::time_point prepare_state_time;
+  std::vector<std::chrono::system_clock::time_point>
+      prepare_message_count_times_list;
+  std::chrono::system_clock::time_point commit_state_time;
+  std::vector<std::chrono::system_clock::time_point>
+      commit_message_count_times_list;
+  std::chrono::system_clock::time_point execution_time;
+
+  // Storage Engine Stats
+  double ext_cache_hit_ratio_ = 0.0;
+  std::string level_db_stats_;
+  std::string level_db_approx_mem_size_;
+
+  // Default constructor with min timestamps
+  TransactionTelemetry()
+      : request_pre_prepare_state_time(
+            std::chrono::system_clock::time_point::min()),
+        prepare_state_time(std::chrono::system_clock::time_point::min()),
+        commit_state_time(std::chrono::system_clock::time_point::min()),
+        execution_time(std::chrono::system_clock::time_point::min()) {}
+
+  // Convert to JSON
+  nlohmann::json to_json() const {
+    nlohmann::json json;
+    json["replica_id"] = replica_id;
+    json["ip"] = ip;
+    json["port"] = port;
+    json["primary_id"] = primary_id;
+    json["propose_pre_prepare_time"] =
+        request_pre_prepare_state_time.time_since_epoch().count();
+    json["prepare_time"] = prepare_state_time.time_since_epoch().count();
+    json["commit_time"] = commit_state_time.time_since_epoch().count();
+    json["execution_time"] = execution_time.time_since_epoch().count();
+
+    json["prepare_message_timestamps"] = nlohmann::json::array();
+    for (const auto& ts : prepare_message_count_times_list) {
+      json["prepare_message_timestamps"].push_back(
+          ts.time_since_epoch().count());
+    }
+
+    json["commit_message_timestamps"] = nlohmann::json::array();
+    for (const auto& ts : commit_message_count_times_list) {
+      json["commit_message_timestamps"].push_back(
+          ts.time_since_epoch().count());
+    }
+
+    json["txn_number"] = txn_number;
+    json["txn_commands"] = txn_command;
+    json["txn_keys"] = txn_key;
+    json["txn_values"] = txn_value;
+    json["ext_cache_hit_ratio"] = ext_cache_hit_ratio_;
+    json["level_db_stats"] = level_db_stats_;
+    json["level_db_approx_mem_size"] = level_db_approx_mem_size_;
+
+    return json;
+  }
+};
+
+// Keep old VisualData for backwards compatibility if needed
+struct VisualData {
+  // Set when initializing
+  int replica_id;
+  int primary_id;
+  std::string ip;
+  int port;
+
+  // Set when new txn is received
+  int txn_number;
+  std::vector<std::string> txn_command;
+  std::vector<std::string> txn_key;
+  std::vector<std::string> txn_value;
+
+  // Request state if primary_id==replica_id, pre_prepare state otherwise
+  std::chrono::system_clock::time_point request_pre_prepare_state_time;
+  std::chrono::system_clock::time_point prepare_state_time;
+  std::vector<std::chrono::system_clock::time_point>
+      prepare_message_count_times_list;
+  std::chrono::system_clock::time_point commit_state_time;
+  std::vector<std::chrono::system_clock::time_point>
+      commit_message_count_times_list;
+  std::chrono::system_clock::time_point execution_time;
+
+  // Storage Engine Stats
+  double ext_cache_hit_ratio_;
+  std::string level_db_stats_;
+  std::string level_db_approx_mem_size_;
+
+  // process stats
+  struct rusage process_stats_;
+};
+
+class Stats {
+ public:
+  static Stats* GetGlobalStats(int sleep_seconds = 5);
+
+  void Stop();
+
+  void RetrieveProgress();
+  void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+                bool faulty_flag);
+  void SetPrimaryId(int primary_id);
+  void SetStorageEngineMetrics(double ext_cache_hit_ratio,
+                               std::string level_db_stats,
+                               std::string level_db_approx_mem_size);
+  void RecordStateTime(std::string state);
+  void RecordStateTime(uint64_t seq, std::string state);
+  void GetTransactionDetails(BatchUserRequest batch_request);
+  void SendSummary();
+  void SendSummary(uint64_t seq);
+  void CrowRoute();
+  bool IsFaulty();
+  void ChangePrimary(int primary_id);
+
+  void AddLatency(uint64_t run_time);
+
+  void Monitor();
+  void MonitorGlobal();
+
+  void IncSocketRecv();
+
+  void IncClientCall();
+
+  void IncClientRequest();
+  void IncPropose();
+  void IncPrepare();
+  void IncPrepare(uint64_t seq);
+  void IncCommit();
+  void IncCommit(uint64_t seq);
+  void IncPendingExecute();
+  void IncExecute();
+  void IncExecuteDone();
+
+  void BroadCastMsg();
+  void SendBroadCastMsg(uint32_t num);
+  void SendBroadCastMsgPerRep();
+  void SeqFail();
+  void IncTotalRequest(uint32_t num);
+  void IncTotalGeoRequest(uint32_t num);
+  void IncGeoRequest();
+
+  void SeqGap(uint64_t seq_gap);
+  // Network in->worker
+  void ServerCall();
+  void ServerProcess();
+  void SetPrometheus(const std::string& prometheus_address);
+
+ protected:
+  Stats(int sleep_time = 5);
+  ~Stats();
+
+ private:
+  std::string monitor_port_ = "default";
+  std::string name_;
+  std::atomic<int> num_call_, run_call_;
+  std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
+  std::thread thread_;
+  std::atomic<bool> begin_;
+  std::atomic<bool> stop_;
+  std::condition_variable cv_;
+  std::mutex mutex_;
+
+  std::thread global_thread_;
+  std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
+      num_commit_, pending_execute_, execute_, execute_done_;
+  std::atomic<uint64_t> client_call_, socket_recv_;
+  std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
+      send_broad_cast_msg_per_rep_;
+  std::atomic<uint64_t> seq_fail_;
+  std::atomic<uint64_t> server_call_, server_process_;
+  std::atomic<uint64_t> run_req_num_;
+  std::atomic<uint64_t> run_req_run_time_;
+  std::atomic<uint64_t> seq_gap_;
+  std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
+  int monitor_sleep_time_ = 5;  // default 5s.
+
+  std::thread crow_thread_;
+  bool enable_resview;
+  bool enable_faulty_switch_;
+
+  // Old single transaction_summary_ for backwards compatibility
+  // TODO: Remove after full migration to sharded map
+  VisualData transaction_summary_;
+
+  // Sharded map solution for per-transaction telemetry
+  static const int kTelemetryShards = 256;
+  std::unordered_map<uint64_t, TransactionTelemetry>
+      transaction_telemetry_map_[kTelemetryShards];
+  std::mutex telemetry_mutex_[kTelemetryShards];
+  TransactionTelemetry
+      static_telemetry_info_;  // Static info to copy to new transactions
+  mutable std::unordered_set<uint64_t>
+      finalized_sequences_;  // Sequences that have been sent (don't create new
+                             // entries)
+  mutable std::mutex finalized_mutex_;  // Protect finalized_sequences_
+
+  // Helper to get shard index
+  size_t GetShardIndex(uint64_t seq) const;
+  TransactionTelemetry& GetOrCreateTelemetry(uint64_t seq);
+  bool IsFinalized(uint64_t seq) const;
+  void MarkFinalized(uint64_t seq);
+
+  std::atomic<bool> make_faulty_;
+  std::atomic<uint64_t> prev_num_prepare_;
+  std::atomic<uint64_t> prev_num_commit_;
+  nlohmann::json summary_json_;
+  nlohmann::json consensus_history_;
+  std::mutex consensus_history_mutex_;  // Protect consensus_history_ access
+
+  std::unique_ptr<PrometheusHandler> prometheus_;
+};
+
+}  // namespace resdb
\ No newline at end of file

Reply via email to