This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch demo-final
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit a3330963e64aee95646e8643e166e91cf6927f02
Author: bchou9 <[email protected]>
AuthorDate: Wed Jan 7 20:25:19 2026 +0000

    Added back stats changes
---
 .gitignore                                         |   3 +-
 .../consensus/execution/transaction_executor.cpp   |   4 +
 platform/consensus/ordering/pbft/commitment.cpp    |  10 +-
 platform/networkstrate/service_network.cpp         |  12 +
 platform/statistic/BUILD                           |   5 +
 platform/statistic/stats.cpp                       | 403 +++++++++++++++++++--
 platform/statistic/stats.h                         |  28 +-
 7 files changed, 438 insertions(+), 27 deletions(-)

diff --git a/.gitignore b/.gitignore
index be0ae77a..d4b8448d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,4 +21,5 @@ resdb/
 100*_db/
 gmon.out
 .history/
-*_db/
\ No newline at end of file
+*_db/
+resdb_data_*/*
\ No newline at end of file
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 0a7401bc..71c228a0 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -302,6 +302,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
 
   std::unique_ptr<BatchUserResponse> response;
   global_stats_->GetTransactionDetails(*batch_request_p);
+  uint64_t seq = request->seq();
+  global_stats_->RecordExecuteStart(seq);
   if (transaction_manager_ && need_execute) {
     if (execute_thread_num_ == 1) {
       response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -354,6 +356,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   if (response == nullptr) {
     response = std::make_unique<BatchUserResponse>();
   }
+  global_stats_->RecordExecuteEnd(seq);
   global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
   response->set_proxy_id(batch_request_p->proxy_id());
   response->set_createtime(batch_request_p->createtime() +
@@ -363,6 +366,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   response->set_seq(request->seq());
   if (post_exec_func_) {
     post_exec_func_(std::move(request), std::move(response));
+    global_stats_->RecordResponseSent(seq);
   }
 
   global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 463d0161..f8ab3d9d 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -263,14 +263,16 @@ int 
Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context,
     }
     return ret;
   }
-  // global_stats_->IncPrepare();
+  global_stats_->IncPrepare();
+  uint64_t seq = request->seq();
+  int sender_id = request->sender_id();
+  global_stats_->RecordPrepareRecv(seq, sender_id);
   std::unique_ptr<Request> commit_request = resdb::NewRequest(
       Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
   commit_request->mutable_data_signature()->Clear();
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), broadcast the commit
   // message.
-  uint64_t seq = request->seq();
   CollectorResultCode ret =
       message_manager_->AddConsensusMsg(context->signature, 
std::move(request));
   if (ret == CollectorResultCode::STATE_CHANGED) {
@@ -303,11 +305,13 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
     return -2;
   }
   uint64_t seq = request->seq();
+  int sender_id = request->sender_id();
   if (request->is_recovery()) {
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
   }
-  // global_stats_->IncCommit();
+  global_stats_->IncCommit();
+  global_stats_->RecordCommitRecv(seq, sender_id);
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), message manager will
   // commit the request.
diff --git a/platform/networkstrate/service_network.cpp 
b/platform/networkstrate/service_network.cpp
index 1e69fd25..0bc60579 100644
--- a/platform/networkstrate/service_network.cpp
+++ b/platform/networkstrate/service_network.cpp
@@ -26,6 +26,7 @@
 
 #include "platform/common/network/tcp_socket.h"
 #include "platform/proto/broadcast.pb.h"
+#include "platform/proto/resdb.pb.h"
 
 namespace resdb {
 
@@ -73,6 +74,17 @@ void ServiceNetwork::AcceptorHandler(const char* buffer, 
size_t data_len) {
     item->data = std::move(sub_request_info);
     // LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data
     // len:"<<item->data->data_len;
+    
+    // Try to extract seq from request for timeline tracking
+    try {
+      Request request;
+      if (request.ParseFromArray(sub_data.data(), sub_data.size())) {
+        global_stats_->RecordNetworkRecv(request.seq());
+      }
+    } catch (...) {
+      // Ignore parse errors, seq extraction is optional
+    }
+    
     global_stats_->ServerCall();
     input_queue_.Push(std::move(item));
   }
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 92328f16..042668e1 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,6 +26,10 @@ cc_library(
     name = "stats",
     srcs = ["stats.cpp"],
     hdrs = ["stats.h"],
+    copts = select({
+        "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
+        "//conditions:default": [],
+    }),
     deps = [
         ":prometheus_handler",
         "//common:asio",
@@ -38,6 +42,7 @@ cc_library(
         "//proto/kv:kv_cc_proto",
         "//third_party:crow",
         "//third_party:prometheus",
+        "//third_party:leveldb",
     ],
 )
 
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 21f1524f..5b4e3aec 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -22,8 +22,15 @@
 #include <glog/logging.h>
 
 #include <ctime>
+#include <sstream>
+#include <fstream>
+#include <sys/stat.h>
+#include <errno.h>
 
 #include "common/utils/utils.h"
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/write_batch.h"
 #include "proto/kv/kv.pb.h"
 
 namespace asio = boost::asio;
@@ -124,22 +131,99 @@ void Stats::CrowRoute() {
   crow::SimpleApp app;
   while (!stop_) {
     try {
+      //Deprecated
       CROW_ROUTE(app, "/consensus_data")
-          .methods("GET"_method)([this](const crow::request& req,
-                                        crow::response& res) {
-            LOG(ERROR) << "API 1";
-            res.set_header("Access-Control-Allow-Origin",
-                           "*");  // Allow requests from any origin
-            res.set_header("Access-Control-Allow-Methods",
-                           "GET, POST, OPTIONS");  // Specify allowed methods
-            res.set_header(
-                "Access-Control-Allow-Headers",
-                "Content-Type, Authorization");  // Specify allowed headers
-
-            // Send your response
-            res.body = consensus_history_.dump();
-            res.end();
-          });
+          .methods("GET"_method)(
+              [this](const crow::request& req, crow::response& res) {
+                LOG(ERROR) << "API 1";
+                res.set_header("Access-Control-Allow-Origin", "*");
+                res.set_header("Access-Control-Allow-Methods",
+                               "GET, POST, OPTIONS");
+                res.set_header("Access-Control-Allow-Headers",
+                               "Content-Type, Authorization");
+
+                int limit = 10; 
+                int page = 1;
+                
+                std::string url = req.url;
+                size_t query_pos = url.find('?');
+                if (query_pos != std::string::npos) {
+                  std::string query_string = url.substr(query_pos + 1);
+                  std::istringstream iss(query_string);
+                  std::string param;
+                  while (std::getline(iss, param, '&')) {
+                    size_t eq_pos = param.find('=');
+                    if (eq_pos != std::string::npos) {
+                      std::string key = param.substr(0, eq_pos);
+                      std::string value = param.substr(eq_pos + 1);
+                      if (key == "limit") {
+                        try {
+                          limit = std::stoi(value);
+                          if (limit <= 0) limit = 10;
+                        } catch (...) {
+                          limit = 10;
+                        }
+                      } else if (key == "page") {
+                        try {
+                          page = std::stoi(value);
+                          if (page <= 0) page = 1;
+                        } catch (...) {
+                          page = 1;
+                        }
+                      }
+                    }
+                  }
+                }
+
+                nlohmann::json result;
+                int total_count = 0;
+                int offset = (page - 1) * limit;
+                int current_index = 0;
+                int items_collected = 0;
+
+                if (summary_db_) {
+                  std::lock_guard<std::mutex> lock(summary_db_mutex_);
+                  leveldb::Iterator* it =
+                      summary_db_->NewIterator(leveldb::ReadOptions());
+                  
+                  // Count only non-timeline entries
+                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+                    std::string key = it->key().ToString();
+                    if (key.find("timeline_") != 0) {
+                      total_count++;
+                    }
+                  }
+                  
+                  // Iterate and collect only non-timeline entries
+                  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+                    std::string key = it->key().ToString();
+                    // Skip timeline entries
+                    if (key.find("timeline_") == 0) {
+                      continue;
+                    }
+                    
+                    if (current_index >= offset && items_collected < limit) {
+                      try {
+                        result[key] =
+                            nlohmann::json::parse(it->value().ToString());
+                        items_collected++;
+                      } catch (...) {
+                        res.code = 500;
+                        result["error"] = "Failed to parse transaction data";
+                        delete it;
+                        res.body = result.dump();
+                        res.end();
+                        return;
+                      }
+                    }
+                    current_index++;
+                  }
+                  delete it;
+                }
+
+                res.body = result.dump();
+                res.end();
+              });
       CROW_ROUTE(app, "/get_status")
           .methods("GET"_method)([this](const crow::request& req,
                                         crow::response& res) {
@@ -168,11 +252,12 @@ void Stats::CrowRoute() {
                 "Access-Control-Allow-Headers",
                 "Content-Type, Authorization");  // Specify allowed headers
 
+            res.body = "Not Enabled";
             // Send your response
             if (enable_faulty_switch_) {
               make_faulty_.store(!make_faulty_.load());
+              res.body = "Success";
             }
-            res.body = "Success";
             res.end();
           });
       CROW_ROUTE(app, "/transaction_data")
@@ -210,6 +295,126 @@ void Stats::CrowRoute() {
             mem_view_json.clear();
             res.end();
           });
+      CROW_ROUTE(app, "/consensus_data/<int>")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res, int txn_number) {
+            LOG(ERROR) << "API 5: Get transaction " << txn_number;
+            res.set_header("Access-Control-Allow-Origin",
+                           "*");  // Allow requests from any origin
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");  // Specify allowed methods
+            res.set_header(
+                "Access-Control-Allow-Headers",
+                "Content-Type, Authorization");  // Specify allowed headers
+
+            nlohmann::json result;
+            if (summary_db_) {
+              std::lock_guard<std::mutex> lock(summary_db_mutex_);
+              std::string txn_key = std::to_string(txn_number);
+              std::string value;
+              leveldb::Status status =
+                  summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
+
+              if (status.ok() && !value.empty()) {
+                try {
+                  result[txn_key] = nlohmann::json::parse(value);
+                } catch (...) {
+                  res.code = 500;
+                  result["error"] = "Failed to parse transaction data";
+                }
+              } else {
+                res.code = 404;
+                result["error"] = "Transaction not found";
+                result["txn_number"] = txn_number;
+              }
+            } else {
+              res.code = 503;
+              result["error"] = "Storage not initialized";
+            }
+
+            res.body = result.dump();
+            res.end();
+          });
+
+      CROW_ROUTE(app, "/transaction_timeline/<int>")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res, int txn_number) {
+            LOG(ERROR) << "API 6: Get transaction timeline " << txn_number;
+            res.set_header("Access-Control-Allow-Origin", "*");
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");
+            res.set_header("Access-Control-Allow-Headers",
+                           "Content-Type, Authorization");
+
+            nlohmann::json result;
+            result["txn_number"] = txn_number;
+            
+            if (!summary_db_) {
+              res.code = 503;
+              result["error"] = "Storage not initialized";
+              res.body = result.dump();
+              res.end();
+              return;
+            }
+            
+            std::lock_guard<std::mutex> lock(summary_db_mutex_);
+            std::string txn_key = std::to_string(txn_number);
+            std::string timeline_key = "timeline_" + txn_key;
+            
+            std::string txn_value;
+            leveldb::Status status =
+                summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value);
+
+            if (status.ok() && !txn_value.empty()) {
+              try {
+                nlohmann::json txn_summary = nlohmann::json::parse(txn_value);
+                nlohmann::json txn_details;
+                if (txn_summary.contains("txn_commands")) {
+                  txn_details["txn_commands"] = txn_summary["txn_commands"];
+                }
+                if (txn_summary.contains("txn_keys")) {
+                  txn_details["txn_keys"] = txn_summary["txn_keys"];
+                }
+                if (txn_summary.contains("txn_values")) {
+                  txn_details["txn_values"] = txn_summary["txn_values"];
+                }
+                if (txn_summary.contains("propose_pre_prepare_time")) {
+                  txn_details["propose_pre_prepare_time"] =
+                      txn_summary["propose_pre_prepare_time"];
+                }
+                if (txn_summary.contains("prepare_time")) {
+                  txn_details["prepare_time"] = txn_summary["prepare_time"];
+                }
+                if (txn_summary.contains("commit_time")) {
+                  txn_details["commit_time"] = txn_summary["commit_time"];
+                }
+                if (txn_summary.contains("execution_time")) {
+                  txn_details["execution_time"] = 
txn_summary["execution_time"];
+                }
+                result["transaction_details"] = txn_details;
+              } catch (...) {
+                LOG(ERROR) << "Failed to parse transaction summary for txn: "
+                           << txn_number;
+              }
+            }
+            
+            std::string timeline_value;
+            status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, 
&timeline_value);
+            
+            if (status.ok() && !timeline_value.empty()) {
+              try {
+                nlohmann::json events_array = 
nlohmann::json::parse(timeline_value);
+                result["timeline"] = events_array;
+              } catch (...) {
+                LOG(ERROR) << "Failed to parse timeline for txn: " << 
txn_number;
+              }
+            } else {
+              LOG(ERROR) << "Timeline not found for txn: " << txn_number;
+            }
+
+            res.body = result.dump();
+            res.end();
+          });
       app.port(8500 + transaction_summary_.port).multithreaded().run();
       sleep(1);
     } catch (const std::exception& e) {
@@ -233,6 +438,27 @@ void Stats::SetProps(int replica_id, std::string ip, int 
port,
   enable_resview = resview_flag;
   enable_faulty_switch_ = faulty_flag;
   if (resview_flag) {
+    // Single data directory for both summaries and timeline
+    std::string data_dir = "./resdb_data_" + std::to_string(port);
+    int mkdir_result = mkdir(data_dir.c_str(), 0755);
+    if (mkdir_result != 0 && errno != EEXIST) {
+      LOG(ERROR) << "Failed to create data directory: " << data_dir;
+    }
+    
+    // Initialize LevelDB for both transaction summaries and timeline events
+    std::string summary_path = data_dir + "/summaries";
+    leveldb::Options options;
+    options.create_if_missing = true;
+    leveldb::DB* db = nullptr;
+    leveldb::Status status = leveldb::DB::Open(options, summary_path, &db);
+    if (status.ok()) {
+      summary_db_.reset(db);
+      LOG(INFO) << "Initialized LevelDB storage at: " << summary_path;
+    } else {
+      LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": "
+                 << status.ToString();
+    }
+    
     crow_thread_ = std::thread(&Stats::CrowRoute, this);
   }
 }
@@ -344,8 +570,63 @@ void Stats::SendSummary() {
 
   summary_json_["ext_cache_hit_ratio"] =
       transaction_summary_.ext_cache_hit_ratio_;
-  consensus_history_[std::to_string(transaction_summary_.txn_number)] =
-      summary_json_;
+
+  std::string txn_key = std::to_string(transaction_summary_.txn_number);
+
+  if (summary_db_) {
+    std::lock_guard<std::mutex> lock(summary_db_mutex_);
+    
+    // Write transaction summary
+    leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
+                                              summary_json_.dump());
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
+                 << ": " << status.ToString();
+    }
+    
+    // Batch write all buffered timeline events for this transaction
+    {
+      std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
+      auto it = timeline_buffer_.find(transaction_summary_.txn_number);
+      if (it != timeline_buffer_.end() && !it->second.empty()) {
+        std::string timeline_key = "timeline_" + txn_key;
+        
+        // Read existing timeline events if they exist, then merge
+        nlohmann::json events_array;
+        std::string existing_value;
+        leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(), 
timeline_key, &existing_value);
+        if (read_status.ok() && !existing_value.empty()) {
+          try {
+            events_array = nlohmann::json::parse(existing_value);
+            // Ensure it's an array
+            if (!events_array.is_array()) {
+              events_array = nlohmann::json::array();
+            }
+          } catch (...) {
+            // If parsing fails, start with empty array
+            events_array = nlohmann::json::array();
+          }
+        } else {
+          events_array = nlohmann::json::array();
+        }
+        
+        // Append new events to existing array
+        for (const auto& event : it->second) {
+          events_array.push_back(event);
+        }
+        
+        // Write merged timeline events back to LevelDB
+        status = summary_db_->Put(leveldb::WriteOptions(), timeline_key,
+                                  events_array.dump());
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to write timeline to storage for txn: " << 
txn_key
+                     << ": " << status.ToString();
+        }
+        // Clean up buffer for this txn
+        timeline_buffer_.erase(it);
+      }
+    }
+  }
 
   LOG(ERROR) << summary_json_.dump();
 
@@ -364,6 +645,76 @@ void Stats::SendSummary() {
   summary_json_.clear();
 }
 
+void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int 
sender_id) {
+  if (!enable_resview) {
+    return;
+  }
+  
+  // Buffer timeline events in memory - written to LevelDB in batch during 
SendSummary()
+  std::lock_guard<std::mutex> lock(timeline_buffer_mutex_);
+  
+  // Create new event
+  nlohmann::json event;
+  event["timestamp"] = 
std::chrono::system_clock::now().time_since_epoch().count();
+  event["phase"] = phase;
+  if (sender_id >= 0) {
+    event["sender_id"] = sender_id;
+  }
+  
+  // Append to in-memory buffer for this seq
+  timeline_buffer_[seq].push_back(event);
+}
+
+void Stats::RecordNetworkRecv(uint64_t seq) {
+  WriteTimelineEvent(seq, "network_recv");
+}
+
+void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
+  WriteTimelineEvent(seq, "prepare_recv", sender_id);
+}
+
+void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
+  WriteTimelineEvent(seq, "commit_recv", sender_id);
+}
+
+void Stats::RecordExecuteStart(uint64_t seq) {
+  WriteTimelineEvent(seq, "execute_start");
+}
+
+void Stats::RecordExecuteEnd(uint64_t seq) {
+  WriteTimelineEvent(seq, "execute_end");
+}
+
+void Stats::RecordResponseSent(uint64_t seq) {
+  WriteTimelineEvent(seq, "response_sent");
+}
+
+void Stats::CleanupOldTimelineEntries() {
+  // Periodically clean up old buffer entries to prevent memory leaks
+  // Keep only entries within the last 10000 sequence numbers
+  std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
+  
+  if (timeline_buffer_.empty()) {
+    return;
+  }
+  
+  // Get the highest sequence number we've seen
+  uint64_t max_seq = timeline_buffer_.rbegin()->first;
+  uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0;
+  
+  // Remove all entries below the threshold
+  auto it = timeline_buffer_.begin();
+  while (it != timeline_buffer_.end() && it->first < cleanup_threshold) {
+    it = timeline_buffer_.erase(it);
+  }
+  
+  size_t buffer_size = timeline_buffer_.size();
+  if (buffer_size > 1000) {
+    LOG(WARNING) << "Timeline buffer size is large: " << buffer_size
+                 << " entries. This may indicate transactions not completing.";
+  }
+}
+
 void Stats::MonitorGlobal() {
   LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
 
@@ -398,6 +749,10 @@ void Stats::MonitorGlobal() {
   while (!stop_) {
     sleep(monitor_sleep_time_);
     time += monitor_sleep_time_;
+    
+    // Periodically clean up old timeline buffer entries to prevent memory 
leaks
+    CleanupOldTimelineEntries();
+    
     seq_fail = seq_fail_;
     socket_recv = socket_recv_;
     client_call = client_call_;
@@ -529,8 +884,10 @@ void Stats::IncPrepare() {
     prometheus_->Inc(PREPARE, 1);
   }
   num_prepare_++;
-  transaction_summary_.prepare_message_count_times_list.push_back(
-      std::chrono::system_clock::now());
+  if (enable_resview) {
+    transaction_summary_.prepare_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
 }
 
 void Stats::IncCommit() {
@@ -538,8 +895,10 @@ void Stats::IncCommit() {
     prometheus_->Inc(COMMIT, 1);
   }
   num_commit_++;
-  transaction_summary_.commit_message_count_times_list.push_back(
-      std::chrono::system_clock::now());
+  if (enable_resview) {
+    transaction_summary_.commit_message_count_times_list.push_back(
+        std::chrono::system_clock::now());
+  }
 }
 
 void Stats::IncPendingExecute() { pending_execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 849c83d1..a2524fc1 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -23,6 +23,7 @@
 
 #include <chrono>
 #include <future>
+#include <map>
 #include <nlohmann/json.hpp>
 
 #include "boost/asio.hpp"
@@ -33,6 +34,11 @@
 #include "proto/kv/kv.pb.h"
 #include "sys/resource.h"
 
+// Forward declaration for LevelDB
+namespace leveldb {
+class DB;
+}
+
 namespace asio = boost::asio;
 namespace beast = boost::beast;
 using tcp = asio::ip::tcp;
@@ -91,6 +97,15 @@ class Stats {
   bool IsFaulty();
   void ChangePrimary(int primary_id);
 
+  // Timeline event recording (disk-based, only when resview enabled)
+  void RecordNetworkRecv(uint64_t seq);
+  void RecordPrepareRecv(uint64_t seq, int sender_id);
+  void RecordCommitRecv(uint64_t seq, int sender_id);
+  void RecordExecuteStart(uint64_t seq);
+  void RecordExecuteEnd(uint64_t seq);
+  void RecordResponseSent(uint64_t seq);
+  void CleanupOldTimelineEntries();  // Prevent timeline buffer memory leak
+
   void AddLatency(uint64_t run_time);
 
   void Monitor();
@@ -159,9 +174,20 @@ class Stats {
   std::atomic<uint64_t> prev_num_prepare_;
   std::atomic<uint64_t> prev_num_commit_;
   nlohmann::json summary_json_;
-  nlohmann::json consensus_history_;
 
   std::unique_ptr<PrometheusHandler> prometheus_;
+  
+  std::unique_ptr<leveldb::DB> summary_db_;
+  std::mutex summary_db_mutex_;
+  
+  // In-memory timeline event buffer (per-seq) - batched writes to LevelDB
+  std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_;
+  std::mutex timeline_buffer_mutex_;
+  
+  // Timeline directory for per-transaction event logs
+  std::string timeline_dir_;
+  std::mutex timeline_mutex_;
+  void WriteTimelineEvent(uint64_t seq, const std::string& phase, int 
sender_id = -1);
 };
 
 }  // namespace resdb

Reply via email to