This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch demo-final
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit a3c1477e3aa84fffb6b6d2e509a5dd40cb498d31
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 05:12:34 2026 +0000

    reverting stats.cpp to base implemenation. TODO: per request telemetry 
tracking, currently gets overwritten leading to incorrect telemetry data
---
 .../consensus/execution/transaction_executor.cpp   |    3 -
 platform/consensus/ordering/pbft/commitment.cpp    |    2 -
 platform/networkstrate/service_network.cpp         |   11 -
 platform/statistic/BUILD                           |    7 +-
 platform/statistic/stats.cpp                       | 1531 ++++++++------------
 platform/statistic/stats.h                         |  322 ++--
 6 files changed, 735 insertions(+), 1141 deletions(-)

diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 71c228a0..bb7cf6be 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -303,7 +303,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   std::unique_ptr<BatchUserResponse> response;
   global_stats_->GetTransactionDetails(*batch_request_p);
   uint64_t seq = request->seq();
-  global_stats_->RecordExecuteStart(seq);
   if (transaction_manager_ && need_execute) {
     if (execute_thread_num_ == 1) {
       response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -356,7 +355,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   if (response == nullptr) {
     response = std::make_unique<BatchUserResponse>();
   }
-  global_stats_->RecordExecuteEnd(seq);
   global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
   response->set_proxy_id(batch_request_p->proxy_id());
   response->set_createtime(batch_request_p->createtime() +
@@ -366,7 +364,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   response->set_seq(request->seq());
   if (post_exec_func_) {
     post_exec_func_(std::move(request), std::move(response));
-    global_stats_->RecordResponseSent(seq);
   }
 
   global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index f8ab3d9d..5a970dd6 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -266,7 +266,6 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
   global_stats_->IncPrepare();
   uint64_t seq = request->seq();
   int sender_id = request->sender_id();
-  global_stats_->RecordPrepareRecv(seq, sender_id);
   std::unique_ptr<Request> commit_request = resdb::NewRequest(
       Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
   commit_request->mutable_data_signature()->Clear();
@@ -311,7 +310,6 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
                                              std::move(request));
   }
   global_stats_->IncCommit();
-  global_stats_->RecordCommitRecv(seq, sender_id);
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), message manager will
   // commit the request.
diff --git a/platform/networkstrate/service_network.cpp 
b/platform/networkstrate/service_network.cpp
index 0bc60579..3a05afda 100644
--- a/platform/networkstrate/service_network.cpp
+++ b/platform/networkstrate/service_network.cpp
@@ -74,17 +74,6 @@ void ServiceNetwork::AcceptorHandler(const char* buffer, 
size_t data_len) {
     item->data = std::move(sub_request_info);
     // LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data
     // len:"<<item->data->data_len;
-    
-    // Try to extract seq from request for timeline tracking
-    try {
-      Request request;
-      if (request.ParseFromArray(sub_data.data(), sub_data.size())) {
-        global_stats_->RecordNetworkRecv(request.seq());
-      }
-    } catch (...) {
-      // Ignore parse errors, seq extraction is optional
-    }
-    
     global_stats_->ServerCall();
     input_queue_.Push(std::move(item));
   }
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 042668e1..81cafe7f 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,10 +26,6 @@ cc_library(
     name = "stats",
     srcs = ["stats.cpp"],
     hdrs = ["stats.h"],
-    copts = select({
-        "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
-        "//conditions:default": [],
-    }),
     deps = [
         ":prometheus_handler",
         "//common:asio",
@@ -42,7 +38,6 @@ cc_library(
         "//proto/kv:kv_cc_proto",
         "//third_party:crow",
         "//third_party:prometheus",
-        "//third_party:leveldb",
     ],
 )
 
@@ -59,4 +54,4 @@ cc_library(
 cc_binary(
     name = "set_random_data",
     srcs = ["set_random_data.cpp"],
-)
+)
\ No newline at end of file
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 5b4e3aec..1c511b3b 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -17,948 +17,589 @@
  * under the License.
  */
 
-#include "platform/statistic/stats.h"
-
-#include <glog/logging.h>
-
-#include <ctime>
-#include <sstream>
-#include <fstream>
-#include <sys/stat.h>
-#include <errno.h>
-
-#include "common/utils/utils.h"
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/write_batch.h"
-#include "proto/kv/kv.pb.h"
-
-namespace asio = boost::asio;
-namespace beast = boost::beast;
-using tcp = asio::ip::tcp;
-
-namespace resdb {
-
-std::mutex g_mutex;
-Stats* Stats::GetGlobalStats(int seconds) {
-  std::unique_lock<std::mutex> lk(g_mutex);
-  static Stats stats(seconds);
-  return &stats;
-}  // gets a singelton instance of Stats Class
-
-Stats::Stats(int sleep_time) {
-  monitor_sleep_time_ = sleep_time;
-#ifdef TEST_MODE
-  monitor_sleep_time_ = 1;
-#endif
-  num_call_ = 0;
-  num_commit_ = 0;
-  run_time_ = 0;
-  run_call_ = 0;
-  run_call_time_ = 0;
-  server_call_ = 0;
-  server_process_ = 0;
-  run_req_num_ = 0;
-  run_req_run_time_ = 0;
-  seq_gap_ = 0;
-  total_request_ = 0;
-  total_geo_request_ = 0;
-  geo_request_ = 0;
-
-  stop_ = false;
-  begin_ = false;
-
-  socket_recv_ = 0;
-  broad_cast_msg_ = 0;
-  send_broad_cast_msg_ = 0;
-
-  prometheus_ = nullptr;
-  global_thread_ =
-      std::thread(&Stats::MonitorGlobal, this);  // pass by reference
-
-  transaction_summary_.port = -1;
-
-  // Setup websocket here
-  make_faulty_.store(false);
-  transaction_summary_.request_pre_prepare_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.prepare_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.commit_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.execution_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.txn_number = 0;
-}
-
-void Stats::Stop() { stop_ = true; }
-
-Stats::~Stats() {
-  stop_ = true;
-  if (global_thread_.joinable()) {
-    global_thread_.join();
-  }
-  if (enable_resview && crow_thread_.joinable()) {
-    crow_thread_.join();
-  }
-}
-
-int64_t GetRSS() {
-  int64_t rss = 0;
-  FILE* fp = NULL;
-  if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
-    return 0;
-  }
-
-  uint64_t size, resident, share, text, lib, data, dt;
-  if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, 
&text,
-             &lib, &data, &dt) != 7) {
-    fclose(fp);
-    return 0;
-  }
-  fclose(fp);
-
-  int64_t page_size = sysconf(_SC_PAGESIZE);
-  rss = resident * page_size;
-
-  // Convert to MB
-  rss = rss / (1024 * 1024);
-
-  return rss;
-}
-
-void Stats::CrowRoute() {
-  crow::SimpleApp app;
-  while (!stop_) {
-    try {
-      //Deprecated
-      CROW_ROUTE(app, "/consensus_data")
-          .methods("GET"_method)(
-              [this](const crow::request& req, crow::response& res) {
-                LOG(ERROR) << "API 1";
-                res.set_header("Access-Control-Allow-Origin", "*");
-                res.set_header("Access-Control-Allow-Methods",
-                               "GET, POST, OPTIONS");
-                res.set_header("Access-Control-Allow-Headers",
-                               "Content-Type, Authorization");
-
-                int limit = 10; 
-                int page = 1;
-                
-                std::string url = req.url;
-                size_t query_pos = url.find('?');
-                if (query_pos != std::string::npos) {
-                  std::string query_string = url.substr(query_pos + 1);
-                  std::istringstream iss(query_string);
-                  std::string param;
-                  while (std::getline(iss, param, '&')) {
-                    size_t eq_pos = param.find('=');
-                    if (eq_pos != std::string::npos) {
-                      std::string key = param.substr(0, eq_pos);
-                      std::string value = param.substr(eq_pos + 1);
-                      if (key == "limit") {
-                        try {
-                          limit = std::stoi(value);
-                          if (limit <= 0) limit = 10;
-                        } catch (...) {
-                          limit = 10;
-                        }
-                      } else if (key == "page") {
-                        try {
-                          page = std::stoi(value);
-                          if (page <= 0) page = 1;
-                        } catch (...) {
-                          page = 1;
-                        }
-                      }
-                    }
-                  }
-                }
-
-                nlohmann::json result;
-                int total_count = 0;
-                int offset = (page - 1) * limit;
-                int current_index = 0;
-                int items_collected = 0;
-
-                if (summary_db_) {
-                  std::lock_guard<std::mutex> lock(summary_db_mutex_);
-                  leveldb::Iterator* it =
-                      summary_db_->NewIterator(leveldb::ReadOptions());
-                  
-                  // Count only non-timeline entries
-                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-                    std::string key = it->key().ToString();
-                    if (key.find("timeline_") != 0) {
-                      total_count++;
-                    }
-                  }
-                  
-                  // Iterate and collect only non-timeline entries
-                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-                    std::string key = it->key().ToString();
-                    // Skip timeline entries
-                    if (key.find("timeline_") == 0) {
-                      continue;
-                    }
-                    
-                    if (current_index >= offset && items_collected < limit) {
-                      try {
-                        result[key] =
-                            nlohmann::json::parse(it->value().ToString());
-                        items_collected++;
-                      } catch (...) {
-                        res.code = 500;
-                        result["error"] = "Failed to parse transaction data";
-                        delete it;
-                        res.body = result.dump();
-                        res.end();
-                        return;
-                      }
-                    }
-                    current_index++;
-                  }
-                  delete it;
-                }
-
-                res.body = result.dump();
-                res.end();
-              });
-      CROW_ROUTE(app, "/get_status")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res) {
-            LOG(ERROR) << "API 2";
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            // Send your response
-            res.body = IsFaulty() ? "Faulty" : "Not Faulty";
-            res.end();
-          });
-      CROW_ROUTE(app, "/make_faulty")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res) {
-            LOG(ERROR) << "API 3";
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            res.body = "Not Enabled";
-            // Send your response
-            if (enable_faulty_switch_) {
-              make_faulty_.store(!make_faulty_.load());
-              res.body = "Success";
-            }
-            res.end();
-          });
-      CROW_ROUTE(app, "/transaction_data")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res) {
-            LOG(ERROR) << "API 4";
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            nlohmann::json mem_view_json;
-            int status =
-                getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
-            if (status == 0) {
-              mem_view_json["resident_set_size"] = GetRSS();
-              mem_view_json["max_resident_set_size"] =
-                  transaction_summary_.process_stats_.ru_maxrss;
-              mem_view_json["num_reads"] =
-                  transaction_summary_.process_stats_.ru_inblock;
-              mem_view_json["num_writes"] =
-                  transaction_summary_.process_stats_.ru_oublock;
-            }
-
-            mem_view_json["ext_cache_hit_ratio"] =
-                transaction_summary_.ext_cache_hit_ratio_;
-            mem_view_json["level_db_stats"] =
-                transaction_summary_.level_db_stats_;
-            mem_view_json["level_db_approx_mem_size"] =
-                transaction_summary_.level_db_approx_mem_size_;
-            res.body = mem_view_json.dump();
-            mem_view_json.clear();
-            res.end();
-          });
-      CROW_ROUTE(app, "/consensus_data/<int>")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res, int txn_number) {
-            LOG(ERROR) << "API 5: Get transaction " << txn_number;
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            nlohmann::json result;
-            if (summary_db_) {
-              std::lock_guard<std::mutex> lock(summary_db_mutex_);
-              std::string txn_key = std::to_string(txn_number);
-              std::string value;
-              leveldb::Status status =
-                  summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
-
-              if (status.ok() && !value.empty()) {
-                try {
-                  result[txn_key] = nlohmann::json::parse(value);
-                } catch (...) {
-                  res.code = 500;
-                  result["error"] = "Failed to parse transaction data";
-                }
-              } else {
-                res.code = 404;
-                result["error"] = "Transaction not found";
-                result["txn_number"] = txn_number;
-              }
-            } else {
-              res.code = 503;
-              result["error"] = "Storage not initialized";
-            }
-
-            res.body = result.dump();
-            res.end();
-          });
-
-      CROW_ROUTE(app, "/transaction_timeline/<int>")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res, int txn_number) {
-            LOG(ERROR) << "API 6: Get transaction timeline " << txn_number;
-            res.set_header("Access-Control-Allow-Origin", "*");
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");
-            res.set_header("Access-Control-Allow-Headers",
-                           "Content-Type, Authorization");
-
-            nlohmann::json result;
-            result["txn_number"] = txn_number;
-            
-            if (!summary_db_) {
-              res.code = 503;
-              result["error"] = "Storage not initialized";
-              res.body = result.dump();
-              res.end();
-              return;
-            }
-            
-            std::lock_guard<std::mutex> lock(summary_db_mutex_);
-            std::string txn_key = std::to_string(txn_number);
-            std::string timeline_key = "timeline_" + txn_key;
-            
-            std::string txn_value;
-            leveldb::Status status =
-                summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value);
-
-            if (status.ok() && !txn_value.empty()) {
-              try {
-                nlohmann::json txn_summary = nlohmann::json::parse(txn_value);
-                nlohmann::json txn_details;
-                if (txn_summary.contains("txn_commands")) {
-                  txn_details["txn_commands"] = txn_summary["txn_commands"];
-                }
-                if (txn_summary.contains("txn_keys")) {
-                  txn_details["txn_keys"] = txn_summary["txn_keys"];
-                }
-                if (txn_summary.contains("txn_values")) {
-                  txn_details["txn_values"] = txn_summary["txn_values"];
-                }
-                if (txn_summary.contains("propose_pre_prepare_time")) {
-                  txn_details["propose_pre_prepare_time"] =
-                      txn_summary["propose_pre_prepare_time"];
-                }
-                if (txn_summary.contains("prepare_time")) {
-                  txn_details["prepare_time"] = txn_summary["prepare_time"];
-                }
-                if (txn_summary.contains("commit_time")) {
-                  txn_details["commit_time"] = txn_summary["commit_time"];
-                }
-                if (txn_summary.contains("execution_time")) {
-                  txn_details["execution_time"] = 
txn_summary["execution_time"];
-                }
-                result["transaction_details"] = txn_details;
-              } catch (...) {
-                LOG(ERROR) << "Failed to parse transaction summary for txn: "
-                           << txn_number;
-              }
-            }
-            
-            std::string timeline_value;
-            status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, 
&timeline_value);
-            
-            if (status.ok() && !timeline_value.empty()) {
-              try {
-                nlohmann::json events_array = 
nlohmann::json::parse(timeline_value);
-                result["timeline"] = events_array;
-              } catch (...) {
-                LOG(ERROR) << "Failed to parse timeline for txn: " << 
txn_number;
-              }
-            } else {
-              LOG(ERROR) << "Timeline not found for txn: " << txn_number;
-            }
-
-            res.body = result.dump();
-            res.end();
-          });
-      app.port(8500 + transaction_summary_.port).multithreaded().run();
-      sleep(1);
-    } catch (const std::exception& e) {
-    }
-  }
-  app.stop();
-}
-
-bool Stats::IsFaulty() { return make_faulty_.load(); }
-
-void Stats::ChangePrimary(int primary_id) {
-  transaction_summary_.primary_id = primary_id;
-  make_faulty_.store(false);
-}
-
-void Stats::SetProps(int replica_id, std::string ip, int port,
-                     bool resview_flag, bool faulty_flag) {
-  transaction_summary_.replica_id = replica_id;
-  transaction_summary_.ip = ip;
-  transaction_summary_.port = port;
-  enable_resview = resview_flag;
-  enable_faulty_switch_ = faulty_flag;
-  if (resview_flag) {
-    // Single data directory for both summaries and timeline
-    std::string data_dir = "./resdb_data_" + std::to_string(port);
-    int mkdir_result = mkdir(data_dir.c_str(), 0755);
-    if (mkdir_result != 0 && errno != EEXIST) {
-      LOG(ERROR) << "Failed to create data directory: " << data_dir;
-    }
-    
-    // Initialize LevelDB for both transaction summaries and timeline events
-    std::string summary_path = data_dir + "/summaries";
-    leveldb::Options options;
-    options.create_if_missing = true;
-    leveldb::DB* db = nullptr;
-    leveldb::Status status = leveldb::DB::Open(options, summary_path, &db);
-    if (status.ok()) {
-      summary_db_.reset(db);
-      LOG(INFO) << "Initialized LevelDB storage at: " << summary_path;
-    } else {
-      LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": "
-                 << status.ToString();
-    }
-    
-    crow_thread_ = std::thread(&Stats::CrowRoute, this);
-  }
-}
-
-void Stats::SetPrimaryId(int primary_id) {
-  transaction_summary_.primary_id = primary_id;
-}
-
-void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
-                                    std::string level_db_stats,
-                                    std::string level_db_approx_mem_size) {
-  transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
-  transaction_summary_.level_db_stats_ = level_db_stats;
-  transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
-}
-
-void Stats::RecordStateTime(std::string state) {
-  if (!enable_resview) {
-    return;
-  }
-  if (state == "request" || state == "pre-prepare") {
-    transaction_summary_.request_pre_prepare_state_time =
-        std::chrono::system_clock::now();
-  } else if (state == "prepare") {
-    transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
-  } else if (state == "commit") {
-    transaction_summary_.commit_state_time = std::chrono::system_clock::now();
-  }
-}
-
-void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
-  if (!enable_resview) {
-    return;
-  }
-  transaction_summary_.txn_number = batch_request.seq();
-  transaction_summary_.txn_command.clear();
-  transaction_summary_.txn_key.clear();
-  transaction_summary_.txn_value.clear();
-  for (auto& sub_request : batch_request.user_requests()) {
-    KVRequest kv_request;
-    if (!kv_request.ParseFromString(sub_request.request().data())) {
-      break;
-    }
-    if (kv_request.cmd() == KVRequest::SET) {
-      transaction_summary_.txn_command.push_back("SET");
-      transaction_summary_.txn_key.push_back(kv_request.key());
-      transaction_summary_.txn_value.push_back(kv_request.value());
-    } else if (kv_request.cmd() == KVRequest::GET) {
-      transaction_summary_.txn_command.push_back("GET");
-      transaction_summary_.txn_key.push_back(kv_request.key());
-      transaction_summary_.txn_value.push_back("");
-    } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
-      transaction_summary_.txn_command.push_back("GETALLVALUES");
-      transaction_summary_.txn_key.push_back(kv_request.key());
-      transaction_summary_.txn_value.push_back("");
-    } else if (kv_request.cmd() == KVRequest::GETRANGE) {
-      transaction_summary_.txn_command.push_back("GETRANGE");
-      transaction_summary_.txn_key.push_back(kv_request.key());
-      transaction_summary_.txn_value.push_back(kv_request.value());
-    }
-  }
-}
-
-void Stats::SendSummary() {
-  if (!enable_resview) {
-    return;
-  }
-  transaction_summary_.execution_time = std::chrono::system_clock::now();
-
-  // Convert Transaction Summary to JSON
-  summary_json_["replica_id"] = transaction_summary_.replica_id;
-  summary_json_["ip"] = transaction_summary_.ip;
-  summary_json_["port"] = transaction_summary_.port;
-  summary_json_["primary_id"] = transaction_summary_.primary_id;
-  summary_json_["propose_pre_prepare_time"] =
-      transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
-          .count();
-  summary_json_["prepare_time"] =
-      transaction_summary_.prepare_state_time.time_since_epoch().count();
-  summary_json_["commit_time"] =
-      transaction_summary_.commit_state_time.time_since_epoch().count();
-  summary_json_["execution_time"] =
-      transaction_summary_.execution_time.time_since_epoch().count();
-  for (size_t i = 0;
-       i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
-    summary_json_["prepare_message_timestamps"].push_back(
-        transaction_summary_.prepare_message_count_times_list[i]
-            .time_since_epoch()
-            .count());
-  }
-  for (size_t i = 0;
-       i < transaction_summary_.commit_message_count_times_list.size(); i++) {
-    summary_json_["commit_message_timestamps"].push_back(
-        transaction_summary_.commit_message_count_times_list[i]
-            .time_since_epoch()
-            .count());
-  }
-  summary_json_["txn_number"] = transaction_summary_.txn_number;
-  for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
-    summary_json_["txn_commands"].push_back(
-        transaction_summary_.txn_command[i]);
-  }
-  for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
-    summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
-  }
-  for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
-    summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
-  }
-
-  summary_json_["ext_cache_hit_ratio"] =
-      transaction_summary_.ext_cache_hit_ratio_;
-
-  std::string txn_key = std::to_string(transaction_summary_.txn_number);
-
-  if (summary_db_) {
-    std::lock_guard<std::mutex> lock(summary_db_mutex_);
-    
-    // Write transaction summary
-    leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
-                                              summary_json_.dump());
-    if (!status.ok()) {
-      LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
-                 << ": " << status.ToString();
-    }
-    
-    // Batch write all buffered timeline events for this transaction
-    {
-      std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
-      auto it = timeline_buffer_.find(transaction_summary_.txn_number);
-      if (it != timeline_buffer_.end() && !it->second.empty()) {
-        std::string timeline_key = "timeline_" + txn_key;
-        
-        // Read existing timeline events if they exist, then merge
-        nlohmann::json events_array;
-        std::string existing_value;
-        leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(), 
timeline_key, &existing_value);
-        if (read_status.ok() && !existing_value.empty()) {
-          try {
-            events_array = nlohmann::json::parse(existing_value);
-            // Ensure it's an array
-            if (!events_array.is_array()) {
-              events_array = nlohmann::json::array();
-            }
-          } catch (...) {
-            // If parsing fails, start with empty array
-            events_array = nlohmann::json::array();
-          }
-        } else {
-          events_array = nlohmann::json::array();
-        }
-        
-        // Append new events to existing array
-        for (const auto& event : it->second) {
-          events_array.push_back(event);
-        }
-        
-        // Write merged timeline events back to LevelDB
-        status = summary_db_->Put(leveldb::WriteOptions(), timeline_key,
-                                  events_array.dump());
-        if (!status.ok()) {
-          LOG(ERROR) << "Failed to write timeline to storage for txn: " << 
txn_key
-                     << ": " << status.ToString();
-        }
-        // Clean up buffer for this txn
-        timeline_buffer_.erase(it);
-      }
-    }
-  }
-
-  LOG(ERROR) << summary_json_.dump();
-
-  // Reset Transaction Summary Parameters
-  transaction_summary_.request_pre_prepare_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.prepare_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.commit_state_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.execution_time =
-      std::chrono::system_clock::time_point::min();
-  transaction_summary_.prepare_message_count_times_list.clear();
-  transaction_summary_.commit_message_count_times_list.clear();
-
-  summary_json_.clear();
-}
-
-void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int 
sender_id) {
-  if (!enable_resview) {
-    return;
-  }
-  
-  // Buffer timeline events in memory - written to LevelDB in batch during 
SendSummary()
-  std::lock_guard<std::mutex> lock(timeline_buffer_mutex_);
-  
-  // Create new event
-  nlohmann::json event;
-  event["timestamp"] = 
std::chrono::system_clock::now().time_since_epoch().count();
-  event["phase"] = phase;
-  if (sender_id >= 0) {
-    event["sender_id"] = sender_id;
-  }
-  
-  // Append to in-memory buffer for this seq
-  timeline_buffer_[seq].push_back(event);
-}
-
-void Stats::RecordNetworkRecv(uint64_t seq) {
-  WriteTimelineEvent(seq, "network_recv");
-}
-
-void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
-  WriteTimelineEvent(seq, "prepare_recv", sender_id);
-}
-
-void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
-  WriteTimelineEvent(seq, "commit_recv", sender_id);
-}
-
-void Stats::RecordExecuteStart(uint64_t seq) {
-  WriteTimelineEvent(seq, "execute_start");
-}
-
-void Stats::RecordExecuteEnd(uint64_t seq) {
-  WriteTimelineEvent(seq, "execute_end");
-}
-
-void Stats::RecordResponseSent(uint64_t seq) {
-  WriteTimelineEvent(seq, "response_sent");
-}
-
-void Stats::CleanupOldTimelineEntries() {
-  // Periodically clean up old buffer entries to prevent memory leaks
-  // Keep only entries within the last 10000 sequence numbers
-  std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
-  
-  if (timeline_buffer_.empty()) {
-    return;
-  }
-  
-  // Get the highest sequence number we've seen
-  uint64_t max_seq = timeline_buffer_.rbegin()->first;
-  uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0;
-  
-  // Remove all entries below the threshold
-  auto it = timeline_buffer_.begin();
-  while (it != timeline_buffer_.end() && it->first < cleanup_threshold) {
-    it = timeline_buffer_.erase(it);
-  }
-  
-  size_t buffer_size = timeline_buffer_.size();
-  if (buffer_size > 1000) {
-    LOG(WARNING) << "Timeline buffer size is large: " << buffer_size
-                 << " entries. This may indicate transactions not completing.";
-  }
-}
-
-void Stats::MonitorGlobal() {
-  LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
-
-  uint64_t seq_fail = 0;
-  uint64_t client_call = 0, socket_recv = 0;
-  uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 
0,
-           pending_execute = 0, execute = 0, execute_done = 0;
-  uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
-  uint64_t send_broad_cast_msg_per_rep = 0;
-  uint64_t server_call = 0, server_process = 0;
-  uint64_t seq_gap = 0;
-  uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
-
-  // ====== for client proxy ======
-  uint64_t run_req_num = 0, run_req_run_time = 0;
-
-  uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
-  // =============================
-
-  uint64_t last_seq_fail = 0;
-  uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
-           last_num_commit = 0;
-  uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
-  uint64_t last_client_call = 0, last_socket_recv = 0;
-  uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
-  uint64_t last_send_broad_cast_msg_per_rep = 0;
-  uint64_t last_server_call = 0, last_server_process = 0;
-  uint64_t last_total_request = 0, last_total_geo_request = 0,
-           last_geo_request = 0;
-  uint64_t time = 0;
-
-  while (!stop_) {
-    sleep(monitor_sleep_time_);
-    time += monitor_sleep_time_;
-    
-    // Periodically clean up old timeline buffer entries to prevent memory 
leaks
-    CleanupOldTimelineEntries();
-    
-    seq_fail = seq_fail_;
-    socket_recv = socket_recv_;
-    client_call = client_call_;
-    num_client_req = num_client_req_;
-    num_propose = num_propose_;
-    num_prepare = num_prepare_;
-    num_commit = num_commit_;
-    pending_execute = pending_execute_;
-    execute = execute_;
-    execute_done = execute_done_;
-    broad_cast_msg = broad_cast_msg_;
-    send_broad_cast_msg = send_broad_cast_msg_;
-    send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
-    server_call = server_call_;
-    server_process = server_process_;
-    seq_gap = seq_gap_;
-    total_request = total_request_;
-    total_geo_request = total_geo_request_;
-    geo_request = geo_request_;
-
-    run_req_num = run_req_num_;
-    run_req_run_time = run_req_run_time_;
-
-    LOG(ERROR) << "=========== monitor =========\n"
-               << "server call:" << server_call - last_server_call
-               << " server process:" << server_process - last_server_process
-               << " socket recv:" << socket_recv - last_socket_recv
-               << " "
-                  "client call:"
-               << client_call - last_client_call
-               << " "
-                  "client req:"
-               << num_client_req - last_num_client_req
-               << " "
-                  "broad_cast:"
-               << broad_cast_msg - last_broad_cast_msg
-               << " "
-                  "send broad_cast:"
-               << send_broad_cast_msg - last_send_broad_cast_msg
-               << " "
-                  "per send broad_cast:"
-               << send_broad_cast_msg_per_rep - 
last_send_broad_cast_msg_per_rep
-               << " "
-                  "propose:"
-               << num_propose - last_num_propose
-               << " "
-                  "prepare:"
-               << (num_prepare - last_num_prepare)
-               << " "
-                  "commit:"
-               << (num_commit - last_num_commit)
-               << " "
-                  "pending execute:"
-               << pending_execute - last_pending_execute
-               << " "
-                  "execute:"
-               << execute - last_execute
-               << " "
-                  "execute done:"
-               << execute_done - last_execute_done << " seq gap:" << seq_gap
-               << " total request:" << total_request - last_total_request
-               << " txn:" << (total_request - last_total_request) / 5
-               << " total geo request:"
-               << total_geo_request - last_total_geo_request
-               << " total geo request per:"
-               << (total_geo_request - last_total_geo_request) / 5
-               << " geo request:" << (geo_request - last_geo_request)
-               << " "
-                  "seq fail:"
-               << seq_fail - last_seq_fail << " time:" << time
-               << " "
-                  "\n--------------- monitor ------------";
-    if (run_req_num - last_run_req_num > 0) {
-      LOG(ERROR) << "  req client latency:"
-                 << static_cast<double>(run_req_run_time -
-                                        last_run_req_run_time) /
-                        (run_req_num - last_run_req_num) / 1000000000.0;
-    }
-
-    last_seq_fail = seq_fail;
-    last_socket_recv = socket_recv;
-    last_client_call = client_call;
-    last_num_client_req = num_client_req;
-    last_num_propose = num_propose;
-    last_num_prepare = num_prepare;
-    last_num_commit = num_commit;
-    last_pending_execute = pending_execute;
-    last_execute = execute;
-    last_execute_done = execute_done;
-
-    last_broad_cast_msg = broad_cast_msg;
-    last_send_broad_cast_msg = send_broad_cast_msg;
-    last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
-
-    last_server_call = server_call;
-    last_server_process = server_process;
-
-    last_run_req_num = run_req_num;
-    last_run_req_run_time = run_req_run_time;
-    last_total_request = total_request;
-    last_total_geo_request = total_geo_request;
-    last_geo_request = geo_request;
-  }
-}
-
-void Stats::IncClientCall() {
-  if (prometheus_) {
-    prometheus_->Inc(CLIENT_CALL, 1);
-  }
-  client_call_++;
-}
-
-void Stats::IncClientRequest() {
-  if (prometheus_) {
-    prometheus_->Inc(CLIENT_REQ, 1);
-  }
-  num_client_req_++;
-}
-
-void Stats::IncPropose() {
-  if (prometheus_) {
-    prometheus_->Inc(PROPOSE, 1);
-  }
-  num_propose_++;
-}
-
-void Stats::IncPrepare() {
-  if (prometheus_) {
-    prometheus_->Inc(PREPARE, 1);
-  }
-  num_prepare_++;
-  if (enable_resview) {
-    transaction_summary_.prepare_message_count_times_list.push_back(
-        std::chrono::system_clock::now());
-  }
-}
-
-void Stats::IncCommit() {
-  if (prometheus_) {
-    prometheus_->Inc(COMMIT, 1);
-  }
-  num_commit_++;
-  if (enable_resview) {
-    transaction_summary_.commit_message_count_times_list.push_back(
-        std::chrono::system_clock::now());
-  }
-}
-
-void Stats::IncPendingExecute() { pending_execute_++; }
-
-void Stats::IncExecute() { execute_++; }
-
-void Stats::IncExecuteDone() {
-  if (prometheus_) {
-    prometheus_->Inc(EXECUTE, 1);
-  }
-  execute_done_++;
-}
-
-void Stats::BroadCastMsg() {
-  if (prometheus_) {
-    prometheus_->Inc(BROAD_CAST, 1);
-  }
-  broad_cast_msg_++;
-}
-
-void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
-
-void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
-
-void Stats::SeqFail() { seq_fail_++; }
-
-void Stats::IncTotalRequest(uint32_t num) {
-  if (prometheus_) {
-    prometheus_->Inc(NUM_EXECUTE_TX, num);
-  }
-  total_request_ += num;
-}
-
-void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
-
-void Stats::IncGeoRequest() { geo_request_++; }
-
-void Stats::ServerCall() {
-  if (prometheus_) {
-    prometheus_->Inc(SERVER_CALL_NAME, 1);
-  }
-  server_call_++;
-}
-
-void Stats::ServerProcess() {
-  if (prometheus_) {
-    prometheus_->Inc(SERVER_PROCESS, 1);
-  }
-  server_process_++;
-}
-
-void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
-
-void Stats::AddLatency(uint64_t run_time) {
-  run_req_num_++;
-  run_req_run_time_ += run_time;
-}
-
-void Stats::SetPrometheus(const std::string& prometheus_address) {
-  prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
-}
-
-}  // namespace resdb
+ #include "platform/statistic/stats.h"
+
+ #include <glog/logging.h>
+ 
+ #include <ctime>
+ 
+ #include "common/utils/utils.h"
+ #include "proto/kv/kv.pb.h"
+ 
+ namespace asio = boost::asio;
+ namespace beast = boost::beast;
+ using tcp = asio::ip::tcp;
+ 
+ namespace resdb {
+ 
+ std::mutex g_mutex;
+ Stats* Stats::GetGlobalStats(int seconds) {
+   std::unique_lock<std::mutex> lk(g_mutex);
+   static Stats stats(seconds);
+   return &stats;
+ }  // gets a singelton instance of Stats Class
+ 
+ Stats::Stats(int sleep_time) {
+   monitor_sleep_time_ = sleep_time;
+ #ifdef TEST_MODE
+   monitor_sleep_time_ = 1;
+ #endif
+   num_call_ = 0;
+   num_commit_ = 0;
+   run_time_ = 0;
+   run_call_ = 0;
+   run_call_time_ = 0;
+   server_call_ = 0;
+   server_process_ = 0;
+   run_req_num_ = 0;
+   run_req_run_time_ = 0;
+   seq_gap_ = 0;
+   total_request_ = 0;
+   total_geo_request_ = 0;
+   geo_request_ = 0;
+ 
+   stop_ = false;
+   begin_ = false;
+ 
+   socket_recv_ = 0;
+   broad_cast_msg_ = 0;
+   send_broad_cast_msg_ = 0;
+ 
+   prometheus_ = nullptr;
+   global_thread_ =
+       std::thread(&Stats::MonitorGlobal, this);  // pass by reference
+ 
+   transaction_summary_.port = -1;
+ 
+   // Setup websocket here
+   make_faulty_.store(false);
+   transaction_summary_.request_pre_prepare_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.prepare_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.commit_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.execution_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.txn_number = 0;
+ }
+ 
+ void Stats::Stop() { stop_ = true; }
+ 
+ Stats::~Stats() {
+   stop_ = true;
+   if (global_thread_.joinable()) {
+     global_thread_.join();
+   }
+   if (enable_resview && crow_thread_.joinable()) {
+     crow_thread_.join();
+   }
+ }
+ 
+ int64_t GetRSS() {
+   int64_t rss = 0;
+   FILE* fp = NULL;
+   if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
+     return 0;
+   }
+ 
+   uint64_t size, resident, share, text, lib, data, dt;
+   if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, 
&text,
+              &lib, &data, &dt) != 7) {
+     fclose(fp);
+     return 0;
+   }
+   fclose(fp);
+ 
+   int64_t page_size = sysconf(_SC_PAGESIZE);
+   rss = resident * page_size;
+ 
+   // Convert to MB
+   rss = rss / (1024 * 1024);
+ 
+   return rss;
+ }
+ 
+ void Stats::CrowRoute() {
+   crow::SimpleApp app;
+   while (!stop_) {
+     try {
+       CROW_ROUTE(app, "/consensus_data")
+           .methods("GET"_method)([this](const crow::request& req,
+                                         crow::response& res) {
+             LOG(ERROR) << "API 1";
+             res.set_header("Access-Control-Allow-Origin",
+                            "*");  // Allow requests from any origin
+             res.set_header("Access-Control-Allow-Methods",
+                            "GET, POST, OPTIONS");  // Specify allowed methods
+             res.set_header(
+                 "Access-Control-Allow-Headers",
+                 "Content-Type, Authorization");  // Specify allowed headers
+ 
+             // Send your response
+             res.body = consensus_history_.dump();
+             res.end();
+           });
+       CROW_ROUTE(app, "/get_status")
+           .methods("GET"_method)([this](const crow::request& req,
+                                         crow::response& res) {
+             LOG(ERROR) << "API 2";
+             res.set_header("Access-Control-Allow-Origin",
+                            "*");  // Allow requests from any origin
+             res.set_header("Access-Control-Allow-Methods",
+                            "GET, POST, OPTIONS");  // Specify allowed methods
+             res.set_header(
+                 "Access-Control-Allow-Headers",
+                 "Content-Type, Authorization");  // Specify allowed headers
+ 
+             // Send your response
+             res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+             res.end();
+           });
+       CROW_ROUTE(app, "/make_faulty")
+           .methods("GET"_method)([this](const crow::request& req,
+                                         crow::response& res) {
+             LOG(ERROR) << "API 3";
+             res.set_header("Access-Control-Allow-Origin",
+                            "*");  // Allow requests from any origin
+             res.set_header("Access-Control-Allow-Methods",
+                            "GET, POST, OPTIONS");  // Specify allowed methods
+             res.set_header(
+                 "Access-Control-Allow-Headers",
+                 "Content-Type, Authorization");  // Specify allowed headers
+ 
+             // Send your response
+             if (enable_faulty_switch_) {
+               make_faulty_.store(!make_faulty_.load());
+             }
+             res.body = "Success";
+             res.end();
+           });
+       CROW_ROUTE(app, "/transaction_data")
+           .methods("GET"_method)([this](const crow::request& req,
+                                         crow::response& res) {
+             LOG(ERROR) << "API 4";
+             res.set_header("Access-Control-Allow-Origin",
+                            "*");  // Allow requests from any origin
+             res.set_header("Access-Control-Allow-Methods",
+                            "GET, POST, OPTIONS");  // Specify allowed methods
+             res.set_header(
+                 "Access-Control-Allow-Headers",
+                 "Content-Type, Authorization");  // Specify allowed headers
+ 
+             nlohmann::json mem_view_json;
+             int status =
+                 getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
+             if (status == 0) {
+               mem_view_json["resident_set_size"] = GetRSS();
+               mem_view_json["max_resident_set_size"] =
+                   transaction_summary_.process_stats_.ru_maxrss;
+               mem_view_json["num_reads"] =
+                   transaction_summary_.process_stats_.ru_inblock;
+               mem_view_json["num_writes"] =
+                   transaction_summary_.process_stats_.ru_oublock;
+             }
+ 
+             mem_view_json["ext_cache_hit_ratio"] =
+                 transaction_summary_.ext_cache_hit_ratio_;
+             mem_view_json["level_db_stats"] =
+                 transaction_summary_.level_db_stats_;
+             mem_view_json["level_db_approx_mem_size"] =
+                 transaction_summary_.level_db_approx_mem_size_;
+             res.body = mem_view_json.dump();
+             mem_view_json.clear();
+             res.end();
+           });
+       app.port(8500 + transaction_summary_.port).multithreaded().run();
+       sleep(1);
+     } catch (const std::exception& e) {
+     }
+   }
+   app.stop();
+ }
+ 
+ bool Stats::IsFaulty() { return make_faulty_.load(); }
+ 
+ void Stats::ChangePrimary(int primary_id) {
+   transaction_summary_.primary_id = primary_id;
+   make_faulty_.store(false);
+ }
+ 
+ void Stats::SetProps(int replica_id, std::string ip, int port,
+                      bool resview_flag, bool faulty_flag) {
+   transaction_summary_.replica_id = replica_id;
+   transaction_summary_.ip = ip;
+   transaction_summary_.port = port;
+   enable_resview = resview_flag;
+   enable_faulty_switch_ = faulty_flag;
+   if (resview_flag) {
+     crow_thread_ = std::thread(&Stats::CrowRoute, this);
+   }
+ }
+ 
+ void Stats::SetPrimaryId(int primary_id) {
+   transaction_summary_.primary_id = primary_id;
+ }
+ 
+ void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
+                                     std::string level_db_stats,
+                                     std::string level_db_approx_mem_size) {
+   transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+   transaction_summary_.level_db_stats_ = level_db_stats;
+   transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+ }
+ 
+ void Stats::RecordStateTime(std::string state) {
+   if (!enable_resview) {
+     return;
+   }
+   if (state == "request" || state == "pre-prepare") {
+     transaction_summary_.request_pre_prepare_state_time =
+         std::chrono::system_clock::now();
+   } else if (state == "prepare") {
+     transaction_summary_.prepare_state_time = 
std::chrono::system_clock::now();
+   } else if (state == "commit") {
+     transaction_summary_.commit_state_time = std::chrono::system_clock::now();
+   }
+ }
+ 
+ void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+   if (!enable_resview) {
+     return;
+   }
+   transaction_summary_.txn_number = batch_request.seq();
+   transaction_summary_.txn_command.clear();
+   transaction_summary_.txn_key.clear();
+   transaction_summary_.txn_value.clear();
+   for (auto& sub_request : batch_request.user_requests()) {
+     KVRequest kv_request;
+     if (!kv_request.ParseFromString(sub_request.request().data())) {
+       break;
+     }
+     if (kv_request.cmd() == KVRequest::SET) {
+       transaction_summary_.txn_command.push_back("SET");
+       transaction_summary_.txn_key.push_back(kv_request.key());
+       transaction_summary_.txn_value.push_back(kv_request.value());
+     } else if (kv_request.cmd() == KVRequest::GET) {
+       transaction_summary_.txn_command.push_back("GET");
+       transaction_summary_.txn_key.push_back(kv_request.key());
+       transaction_summary_.txn_value.push_back("");
+     } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+       transaction_summary_.txn_command.push_back("GETALLVALUES");
+       transaction_summary_.txn_key.push_back(kv_request.key());
+       transaction_summary_.txn_value.push_back("");
+     } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+       transaction_summary_.txn_command.push_back("GETRANGE");
+       transaction_summary_.txn_key.push_back(kv_request.key());
+       transaction_summary_.txn_value.push_back(kv_request.value());
+     }
+   }
+ }
+ 
+ void Stats::SendSummary() {
+   if (!enable_resview) {
+     return;
+   }
+   transaction_summary_.execution_time = std::chrono::system_clock::now();
+ 
+   // Convert Transaction Summary to JSON
+   summary_json_["replica_id"] = transaction_summary_.replica_id;
+   summary_json_["ip"] = transaction_summary_.ip;
+   summary_json_["port"] = transaction_summary_.port;
+   summary_json_["primary_id"] = transaction_summary_.primary_id;
+   summary_json_["propose_pre_prepare_time"] =
+       transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+           .count();
+   summary_json_["prepare_time"] =
+       transaction_summary_.prepare_state_time.time_since_epoch().count();
+   summary_json_["commit_time"] =
+       transaction_summary_.commit_state_time.time_since_epoch().count();
+   summary_json_["execution_time"] =
+       transaction_summary_.execution_time.time_since_epoch().count();
+   for (size_t i = 0;
+        i < transaction_summary_.prepare_message_count_times_list.size(); i++) 
{
+     summary_json_["prepare_message_timestamps"].push_back(
+         transaction_summary_.prepare_message_count_times_list[i]
+             .time_since_epoch()
+             .count());
+   }
+   for (size_t i = 0;
+        i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+     summary_json_["commit_message_timestamps"].push_back(
+         transaction_summary_.commit_message_count_times_list[i]
+             .time_since_epoch()
+             .count());
+   }
+   summary_json_["txn_number"] = transaction_summary_.txn_number;
+   for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+     summary_json_["txn_commands"].push_back(
+         transaction_summary_.txn_command[i]);
+   }
+   for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
+     summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+   }
+   for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
+     summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+   }
+ 
+   summary_json_["ext_cache_hit_ratio"] =
+       transaction_summary_.ext_cache_hit_ratio_;
+   consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+       summary_json_;
+ 
+   LOG(ERROR) << summary_json_.dump();
+ 
+   // Reset Transaction Summary Parameters
+   transaction_summary_.request_pre_prepare_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.prepare_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.commit_state_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.execution_time =
+       std::chrono::system_clock::time_point::min();
+   transaction_summary_.prepare_message_count_times_list.clear();
+   transaction_summary_.commit_message_count_times_list.clear();
+ 
+   summary_json_.clear();
+ }
+ 
+ void Stats::MonitorGlobal() {
+   LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
+ 
+   uint64_t seq_fail = 0;
+   uint64_t client_call = 0, socket_recv = 0;
+   uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 
0,
+            pending_execute = 0, execute = 0, execute_done = 0;
+   uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
+   uint64_t send_broad_cast_msg_per_rep = 0;
+   uint64_t server_call = 0, server_process = 0;
+   uint64_t seq_gap = 0;
+   uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
+ 
+   // ====== for client proxy ======
+   uint64_t run_req_num = 0, run_req_run_time = 0;
+ 
+   uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
+   // =============================
+ 
+   uint64_t last_seq_fail = 0;
+   uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 
0,
+            last_num_commit = 0;
+   uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
+   uint64_t last_client_call = 0, last_socket_recv = 0;
+   uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
+   uint64_t last_send_broad_cast_msg_per_rep = 0;
+   uint64_t last_server_call = 0, last_server_process = 0;
+   uint64_t last_total_request = 0, last_total_geo_request = 0,
+            last_geo_request = 0;
+   uint64_t time = 0;
+ 
+   while (!stop_) {
+     sleep(monitor_sleep_time_);
+     time += monitor_sleep_time_;
+     seq_fail = seq_fail_;
+     socket_recv = socket_recv_;
+     client_call = client_call_;
+     num_client_req = num_client_req_;
+     num_propose = num_propose_;
+     num_prepare = num_prepare_;
+     num_commit = num_commit_;
+     pending_execute = pending_execute_;
+     execute = execute_;
+     execute_done = execute_done_;
+     broad_cast_msg = broad_cast_msg_;
+     send_broad_cast_msg = send_broad_cast_msg_;
+     send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
+     server_call = server_call_;
+     server_process = server_process_;
+     seq_gap = seq_gap_;
+     total_request = total_request_;
+     total_geo_request = total_geo_request_;
+     geo_request = geo_request_;
+ 
+     run_req_num = run_req_num_;
+     run_req_run_time = run_req_run_time_;
+ 
+     LOG(ERROR) << "=========== monitor =========\n"
+                << "server call:" << server_call - last_server_call
+                << " server process:" << server_process - last_server_process
+                << " socket recv:" << socket_recv - last_socket_recv
+                << " "
+                   "client call:"
+                << client_call - last_client_call
+                << " "
+                   "client req:"
+                << num_client_req - last_num_client_req
+                << " "
+                   "broad_cast:"
+                << broad_cast_msg - last_broad_cast_msg
+                << " "
+                   "send broad_cast:"
+                << send_broad_cast_msg - last_send_broad_cast_msg
+                << " "
+                   "per send broad_cast:"
+                << send_broad_cast_msg_per_rep - 
last_send_broad_cast_msg_per_rep
+                << " "
+                   "propose:"
+                << num_propose - last_num_propose
+                << " "
+                   "prepare:"
+                << (num_prepare - last_num_prepare)
+                << " "
+                   "commit:"
+                << (num_commit - last_num_commit)
+                << " "
+                   "pending execute:"
+                << pending_execute - last_pending_execute
+                << " "
+                   "execute:"
+                << execute - last_execute
+                << " "
+                   "execute done:"
+                << execute_done - last_execute_done << " seq gap:" << seq_gap
+                << " total request:" << total_request - last_total_request
+                << " txn:" << (total_request - last_total_request) / 5
+                << " total geo request:"
+                << total_geo_request - last_total_geo_request
+                << " total geo request per:"
+                << (total_geo_request - last_total_geo_request) / 5
+                << " geo request:" << (geo_request - last_geo_request)
+                << " "
+                   "seq fail:"
+                << seq_fail - last_seq_fail << " time:" << time
+                << " "
+                   "\n--------------- monitor ------------";
+     if (run_req_num - last_run_req_num > 0) {
+       LOG(ERROR) << "  req client latency:"
+                  << static_cast<double>(run_req_run_time -
+                                         last_run_req_run_time) /
+                         (run_req_num - last_run_req_num) / 1000000000.0;
+     }
+ 
+     last_seq_fail = seq_fail;
+     last_socket_recv = socket_recv;
+     last_client_call = client_call;
+     last_num_client_req = num_client_req;
+     last_num_propose = num_propose;
+     last_num_prepare = num_prepare;
+     last_num_commit = num_commit;
+     last_pending_execute = pending_execute;
+     last_execute = execute;
+     last_execute_done = execute_done;
+ 
+     last_broad_cast_msg = broad_cast_msg;
+     last_send_broad_cast_msg = send_broad_cast_msg;
+     last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
+ 
+     last_server_call = server_call;
+     last_server_process = server_process;
+ 
+     last_run_req_num = run_req_num;
+     last_run_req_run_time = run_req_run_time;
+     last_total_request = total_request;
+     last_total_geo_request = total_geo_request;
+     last_geo_request = geo_request;
+   }
+ }
+ 
+ void Stats::IncClientCall() {
+   if (prometheus_) {
+     prometheus_->Inc(CLIENT_CALL, 1);
+   }
+   client_call_++;
+ }
+ 
+ void Stats::IncClientRequest() {
+   if (prometheus_) {
+     prometheus_->Inc(CLIENT_REQ, 1);
+   }
+   num_client_req_++;
+ }
+ 
+ void Stats::IncPropose() {
+   if (prometheus_) {
+     prometheus_->Inc(PROPOSE, 1);
+   }
+   num_propose_++;
+ }
+ 
+ void Stats::IncPrepare() {
+   if (prometheus_) {
+     prometheus_->Inc(PREPARE, 1);
+   }
+   num_prepare_++;
+   transaction_summary_.prepare_message_count_times_list.push_back(
+       std::chrono::system_clock::now());
+ }
+ 
+ void Stats::IncCommit() {
+   if (prometheus_) {
+     prometheus_->Inc(COMMIT, 1);
+   }
+   num_commit_++;
+   transaction_summary_.commit_message_count_times_list.push_back(
+       std::chrono::system_clock::now());
+ }
+ 
+ void Stats::IncPendingExecute() { pending_execute_++; }
+ 
+ void Stats::IncExecute() { execute_++; }
+ 
+ void Stats::IncExecuteDone() {
+   if (prometheus_) {
+     prometheus_->Inc(EXECUTE, 1);
+   }
+   execute_done_++;
+ }
+ 
+ void Stats::BroadCastMsg() {
+   if (prometheus_) {
+     prometheus_->Inc(BROAD_CAST, 1);
+   }
+   broad_cast_msg_++;
+ }
+ 
+ void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+ 
+ void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+ 
+ void Stats::SeqFail() { seq_fail_++; }
+ 
+ void Stats::IncTotalRequest(uint32_t num) {
+   if (prometheus_) {
+     prometheus_->Inc(NUM_EXECUTE_TX, num);
+   }
+   total_request_ += num;
+ }
+ 
+ void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+ 
+ void Stats::IncGeoRequest() { geo_request_++; }
+ 
+ void Stats::ServerCall() {
+   if (prometheus_) {
+     prometheus_->Inc(SERVER_CALL_NAME, 1);
+   }
+   server_call_++;
+ }
+ 
+ void Stats::ServerProcess() {
+   if (prometheus_) {
+     prometheus_->Inc(SERVER_PROCESS, 1);
+   }
+   server_process_++;
+ }
+ 
+ void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+ 
+ void Stats::AddLatency(uint64_t run_time) {
+   run_req_num_++;
+   run_req_run_time_ += run_time;
+ }
+ 
+ void Stats::SetPrometheus(const std::string& prometheus_address) {
+   prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
+ }
+ 
+ }  // namespace resdb
\ No newline at end of file
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index a2524fc1..c0ad30a7 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -17,177 +17,151 @@
  * under the License.
  */
 
-#pragma once
-
-#include <crow.h>
-
-#include <chrono>
-#include <future>
-#include <map>
-#include <nlohmann/json.hpp>
-
-#include "boost/asio.hpp"
-#include "boost/beast.hpp"
-#include "platform/common/network/tcp_socket.h"
-#include "platform/proto/resdb.pb.h"
-#include "platform/statistic/prometheus_handler.h"
-#include "proto/kv/kv.pb.h"
-#include "sys/resource.h"
-
-// Forward declaration for LevelDB
-namespace leveldb {
-class DB;
-}
-
-namespace asio = boost::asio;
-namespace beast = boost::beast;
-using tcp = asio::ip::tcp;
-
-namespace resdb {
-
-struct VisualData {
-  // Set when initializing
-  int replica_id;
-  int primary_id;
-  std::string ip;
-  int port;
-
-  // Set when new txn is received
-  int txn_number;
-  std::vector<std::string> txn_command;
-  std::vector<std::string> txn_key;
-  std::vector<std::string> txn_value;
-
-  // Request state if primary_id==replica_id, pre_prepare state otherwise
-  std::chrono::system_clock::time_point request_pre_prepare_state_time;
-  std::chrono::system_clock::time_point prepare_state_time;
-  std::vector<std::chrono::system_clock::time_point>
-      prepare_message_count_times_list;
-  std::chrono::system_clock::time_point commit_state_time;
-  std::vector<std::chrono::system_clock::time_point>
-      commit_message_count_times_list;
-  std::chrono::system_clock::time_point execution_time;
-
-  // Storage Engine Stats
-  double ext_cache_hit_ratio_;
-  std::string level_db_stats_;
-  std::string level_db_approx_mem_size_;
-
-  // process stats
-  struct rusage process_stats_;
-};
-
-class Stats {
- public:
-  static Stats* GetGlobalStats(int sleep_seconds = 5);
-
-  void Stop();
-
-  void RetrieveProgress();
-  void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
-                bool faulty_flag);
-  void SetPrimaryId(int primary_id);
-  void SetStorageEngineMetrics(double ext_cache_hit_ratio,
-                               std::string level_db_stats,
-                               std::string level_db_approx_mem_size);
-  void RecordStateTime(std::string state);
-  void GetTransactionDetails(BatchUserRequest batch_request);
-  void SendSummary();
-  void CrowRoute();
-  bool IsFaulty();
-  void ChangePrimary(int primary_id);
-
-  // Timeline event recording (disk-based, only when resview enabled)
-  void RecordNetworkRecv(uint64_t seq);
-  void RecordPrepareRecv(uint64_t seq, int sender_id);
-  void RecordCommitRecv(uint64_t seq, int sender_id);
-  void RecordExecuteStart(uint64_t seq);
-  void RecordExecuteEnd(uint64_t seq);
-  void RecordResponseSent(uint64_t seq);
-  void CleanupOldTimelineEntries();  // Prevent timeline buffer memory leak
-
-  void AddLatency(uint64_t run_time);
-
-  void Monitor();
-  void MonitorGlobal();
-
-  void IncSocketRecv();
-
-  void IncClientCall();
-
-  void IncClientRequest();
-  void IncPropose();
-  void IncPrepare();
-  void IncCommit();
-  void IncPendingExecute();
-  void IncExecute();
-  void IncExecuteDone();
-
-  void BroadCastMsg();
-  void SendBroadCastMsg(uint32_t num);
-  void SendBroadCastMsgPerRep();
-  void SeqFail();
-  void IncTotalRequest(uint32_t num);
-  void IncTotalGeoRequest(uint32_t num);
-  void IncGeoRequest();
-
-  void SeqGap(uint64_t seq_gap);
-  // Network in->worker
-  void ServerCall();
-  void ServerProcess();
-  void SetPrometheus(const std::string& prometheus_address);
-
- protected:
-  Stats(int sleep_time = 5);
-  ~Stats();
-
- private:
-  std::string monitor_port_ = "default";
-  std::string name_;
-  std::atomic<int> num_call_, run_call_;
-  std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
-  std::thread thread_;
-  std::atomic<bool> begin_;
-  std::atomic<bool> stop_;
-  std::condition_variable cv_;
-  std::mutex mutex_;
-
-  std::thread global_thread_;
-  std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
-      num_commit_, pending_execute_, execute_, execute_done_;
-  std::atomic<uint64_t> client_call_, socket_recv_;
-  std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
-      send_broad_cast_msg_per_rep_;
-  std::atomic<uint64_t> seq_fail_;
-  std::atomic<uint64_t> server_call_, server_process_;
-  std::atomic<uint64_t> run_req_num_;
-  std::atomic<uint64_t> run_req_run_time_;
-  std::atomic<uint64_t> seq_gap_;
-  std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
-  int monitor_sleep_time_ = 5;  // default 5s.
-
-  std::thread crow_thread_;
-  bool enable_resview;
-  bool enable_faulty_switch_;
-  VisualData transaction_summary_;
-  std::atomic<bool> make_faulty_;
-  std::atomic<uint64_t> prev_num_prepare_;
-  std::atomic<uint64_t> prev_num_commit_;
-  nlohmann::json summary_json_;
-
-  std::unique_ptr<PrometheusHandler> prometheus_;
-  
-  std::unique_ptr<leveldb::DB> summary_db_;
-  std::mutex summary_db_mutex_;
-  
-  // In-memory timeline event buffer (per-seq) - batched writes to LevelDB
-  std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_;
-  std::mutex timeline_buffer_mutex_;
-  
-  // Timeline directory for per-transaction event logs
-  std::string timeline_dir_;
-  std::mutex timeline_mutex_;
-  void WriteTimelineEvent(uint64_t seq, const std::string& phase, int 
sender_id = -1);
-};
-
-}  // namespace resdb
+ #pragma once
+
+ #include <crow.h>
+ 
+ #include <chrono>
+ #include <future>
+ #include <nlohmann/json.hpp>
+ 
+ #include "boost/asio.hpp"
+ #include "boost/beast.hpp"
+ #include "platform/common/network/tcp_socket.h"
+ #include "platform/proto/resdb.pb.h"
+ #include "platform/statistic/prometheus_handler.h"
+ #include "proto/kv/kv.pb.h"
+ #include "sys/resource.h"
+ 
+ namespace asio = boost::asio;
+ namespace beast = boost::beast;
+ using tcp = asio::ip::tcp;
+ 
+ namespace resdb {
+ 
+ struct VisualData {
+   // Set when initializing
+   int replica_id;
+   int primary_id;
+   std::string ip;
+   int port;
+ 
+   // Set when new txn is received
+   int txn_number;
+   std::vector<std::string> txn_command;
+   std::vector<std::string> txn_key;
+   std::vector<std::string> txn_value;
+ 
+   // Request state if primary_id==replica_id, pre_prepare state otherwise
+   std::chrono::system_clock::time_point request_pre_prepare_state_time;
+   std::chrono::system_clock::time_point prepare_state_time;
+   std::vector<std::chrono::system_clock::time_point>
+       prepare_message_count_times_list;
+   std::chrono::system_clock::time_point commit_state_time;
+   std::vector<std::chrono::system_clock::time_point>
+       commit_message_count_times_list;
+   std::chrono::system_clock::time_point execution_time;
+ 
+   // Storage Engine Stats
+   double ext_cache_hit_ratio_;
+   std::string level_db_stats_;
+   std::string level_db_approx_mem_size_;
+ 
+   // process stats
+   struct rusage process_stats_;
+ };
+ 
+ class Stats {
+  public:
+   static Stats* GetGlobalStats(int sleep_seconds = 5);
+ 
+   void Stop();
+ 
+   void RetrieveProgress();
+   void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+                 bool faulty_flag);
+   void SetPrimaryId(int primary_id);
+   void SetStorageEngineMetrics(double ext_cache_hit_ratio,
+                                std::string level_db_stats,
+                                std::string level_db_approx_mem_size);
+   void RecordStateTime(std::string state);
+   void GetTransactionDetails(BatchUserRequest batch_request);
+   void SendSummary();
+   void CrowRoute();
+   bool IsFaulty();
+   void ChangePrimary(int primary_id);
+ 
+   void AddLatency(uint64_t run_time);
+ 
+   void Monitor();
+   void MonitorGlobal();
+ 
+   void IncSocketRecv();
+ 
+   void IncClientCall();
+ 
+   void IncClientRequest();
+   void IncPropose();
+   void IncPrepare();
+   void IncCommit();
+   void IncPendingExecute();
+   void IncExecute();
+   void IncExecuteDone();
+ 
+   void BroadCastMsg();
+   void SendBroadCastMsg(uint32_t num);
+   void SendBroadCastMsgPerRep();
+   void SeqFail();
+   void IncTotalRequest(uint32_t num);
+   void IncTotalGeoRequest(uint32_t num);
+   void IncGeoRequest();
+ 
+   void SeqGap(uint64_t seq_gap);
+   // Network in->worker
+   void ServerCall();
+   void ServerProcess();
+   void SetPrometheus(const std::string& prometheus_address);
+ 
+  protected:
+   Stats(int sleep_time = 5);
+   ~Stats();
+ 
+  private:
+   std::string monitor_port_ = "default";
+   std::string name_;
+   std::atomic<int> num_call_, run_call_;
+   std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
+   std::thread thread_;
+   std::atomic<bool> begin_;
+   std::atomic<bool> stop_;
+   std::condition_variable cv_;
+   std::mutex mutex_;
+ 
+   std::thread global_thread_;
+   std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
+       num_commit_, pending_execute_, execute_, execute_done_;
+   std::atomic<uint64_t> client_call_, socket_recv_;
+   std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
+       send_broad_cast_msg_per_rep_;
+   std::atomic<uint64_t> seq_fail_;
+   std::atomic<uint64_t> server_call_, server_process_;
+   std::atomic<uint64_t> run_req_num_;
+   std::atomic<uint64_t> run_req_run_time_;
+   std::atomic<uint64_t> seq_gap_;
+   std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
+   int monitor_sleep_time_ = 5;  // default 5s.
+ 
+   std::thread crow_thread_;
+   bool enable_resview;
+   bool enable_faulty_switch_;
+   VisualData transaction_summary_;
+   std::atomic<bool> make_faulty_;
+   std::atomic<uint64_t> prev_num_prepare_;
+   std::atomic<uint64_t> prev_num_commit_;
+   nlohmann::json summary_json_;
+   nlohmann::json consensus_history_;
+ 
+   std::unique_ptr<PrometheusHandler> prometheus_;
+ };
+ 
+ }  // namespace resdb
\ No newline at end of file

Reply via email to