This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch demo-final
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit c77cc3ef8baa5e86af47eb3fd6ff4cb93e478ea6
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 07:05:57 2026 +0000

    timeline events addition + per sequence API
---
 .../consensus/execution/transaction_executor.cpp   |   6 +
 platform/consensus/ordering/pbft/commitment.cpp    |   2 +
 platform/statistic/stats.cpp                       | 175 ++++++++++++++++++++-
 platform/statistic/stats.h                         |  24 +++
 4 files changed, 206 insertions(+), 1 deletion(-)

diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 3e30d3f6..34eb3e3e 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -257,6 +257,7 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
   //          id:"<<request->proxy_id();
   std::unique_ptr<BatchUserResponse> response;
   global_stats_->GetTransactionDetails(batch_request);
+  global_stats_->RecordExecuteStart(batch_request.seq());
   if (transaction_manager_) {
     response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
                                                          batch_request);
@@ -264,6 +265,7 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
 
   // global_stats_->IncTotalRequest(batch_request.user_requests_size());
   // global_stats_->IncExecuteDone();
+  global_stats_->RecordExecuteEnd(batch_request.seq());
 }
 
 void TransactionExecutor::Execute(std::unique_ptr<Request> request,
@@ -303,6 +305,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   std::unique_ptr<BatchUserResponse> response;
   global_stats_->GetTransactionDetails(*batch_request_p);
   uint64_t seq = request->seq();
+  // Timeline: execution start
+  global_stats_->RecordExecuteStart(seq);
   if (transaction_manager_ && need_execute) {
     if (execute_thread_num_ == 1) {
       response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -366,6 +370,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
     post_exec_func_(std::move(request), std::move(response));
   }
 
+  // Timeline: execution end
+  global_stats_->RecordExecuteEnd(seq);
   global_stats_->IncExecuteDone();
 }
 
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 7ebb769f..0f4a970d 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -265,6 +265,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
   }
   uint64_t seq = request->seq();
   int sender_id = request->sender_id();
+  global_stats_->RecordPrepareRecv(seq, sender_id);
   global_stats_->IncPrepare(seq);
   std::unique_ptr<Request> commit_request = resdb::NewRequest(
       Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
@@ -309,6 +310,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
   }
+  global_stats_->RecordCommitRecv(seq, sender_id);
   global_stats_->IncCommit(seq);
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), message manager will
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 8957e733..dcda37dd 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -139,13 +139,64 @@ void Stats::CrowRoute() {
                 "Access-Control-Allow-Headers",
                 "Content-Type, Authorization");  // Specify allowed headers
 
-            // Send your response
+            // Send your response - includes all consensus history with 
timeline events
             {
               std::lock_guard<std::mutex> lock(consensus_history_mutex_);
               res.body = consensus_history_.dump();
             }
             res.end();
           });
+      CROW_ROUTE(app, "/consensus_data/<int>")
+          .methods("GET"_method)([this](const crow::request& req,
+                                        crow::response& res, int seq_number) {
+            LOG(ERROR) << "API: Get consensus data for sequence " << 
seq_number;
+            res.set_header("Access-Control-Allow-Origin", "*");
+            res.set_header("Access-Control-Allow-Methods",
+                           "GET, POST, OPTIONS");
+            res.set_header("Access-Control-Allow-Headers",
+                           "Content-Type, Authorization");
+
+            nlohmann::json result;
+            
+            // First check consensus_history_
+            {
+              std::lock_guard<std::mutex> lock(consensus_history_mutex_);
+              std::string seq_str = std::to_string(seq_number);
+              if (consensus_history_.find(seq_str) != 
consensus_history_.end()) {
+                result = consensus_history_[seq_str];
+              }
+            }
+            
+            // Also check live telemetry map (may have more recent data)
+            uint64_t seq = static_cast<uint64_t>(seq_number);
+            size_t shard = seq % 256;  // kTelemetryShards
+            {
+              std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]);
+              auto& map = transaction_telemetry_map_[shard];
+              auto it = map.find(seq);
+              if (it != map.end()) {
+                // Merge live telemetry data (may be more complete)
+                nlohmann::json live_data = it->second.to_json();
+                
+                // If we have data from consensus_history_, merge it
+                if (!result.empty()) {
+                  // Prefer live data for most fields, but keep timeline 
events from both
+                  result.update(live_data);
+                } else {
+                  result = live_data;
+                }
+              }
+            }
+            
+            if (result.empty()) {
+              res.code = 404;
+              result["error"] = "Transaction not found";
+              result["seq_number"] = seq_number;
+            }
+            
+            res.body = result.dump();
+            res.end();
+          });
       CROW_ROUTE(app, "/get_status")
           .methods("GET"_method)([this](const crow::request& req,
                                         crow::response& res) {
@@ -352,6 +403,128 @@ void Stats::RecordStateTime(uint64_t seq, std::string 
state) {
   }
 }
 
+void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
+  if (!enable_resview) {
+    return;
+  }
+  
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+  
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    // Initialize with static info
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ = 
static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ = 
static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+  
+  // Add timeline event
+  TransactionTelemetry::TimelineEvent event;
+  event.timestamp = std::chrono::system_clock::now();
+  event.phase = "prepare_recv";
+  event.sender_id = sender_id;
+  it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
+  if (!enable_resview) {
+    return;
+  }
+  
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+  
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    // Initialize with static info
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ = 
static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ = 
static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+  
+  // Add timeline event
+  TransactionTelemetry::TimelineEvent event;
+  event.timestamp = std::chrono::system_clock::now();
+  event.phase = "commit_recv";
+  event.sender_id = sender_id;
+  it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordExecuteStart(uint64_t seq) {
+  if (!enable_resview) {
+    return;
+  }
+
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ = 
static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ = 
static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+
+  TransactionTelemetry::TimelineEvent event;
+  event.timestamp = std::chrono::system_clock::now();
+  event.phase = "execute_start";
+  it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordExecuteEnd(uint64_t seq) {
+  if (!enable_resview) {
+    return;
+  }
+
+  size_t shard = GetShardIndex(seq);
+  std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+  auto& map = transaction_telemetry_map_[shard];
+  auto it = map.find(seq);
+  if (it == map.end()) {
+    TransactionTelemetry telemetry;
+    telemetry.replica_id = static_telemetry_info_.replica_id;
+    telemetry.primary_id = static_telemetry_info_.primary_id;
+    telemetry.ip = static_telemetry_info_.ip;
+    telemetry.port = static_telemetry_info_.port;
+    telemetry.ext_cache_hit_ratio_ = 
static_telemetry_info_.ext_cache_hit_ratio_;
+    telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+    telemetry.level_db_approx_mem_size_ = 
static_telemetry_info_.level_db_approx_mem_size_;
+    telemetry.txn_number = seq;
+    it = map.emplace(seq, std::move(telemetry)).first;
+  }
+
+  TransactionTelemetry::TimelineEvent event;
+  event.timestamp = std::chrono::system_clock::now();
+  event.phase = "execute_end";
+  it->second.timeline_events.push_back(event);
+}
+
 void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
   if (!enable_resview) {
     return;
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 4a0462bd..3f711ef9 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -65,6 +65,14 @@ struct TransactionTelemetry {
       commit_message_count_times_list;
   std::chrono::system_clock::time_point execution_time;
 
+  // Timeline events for detailed tracking
+  struct TimelineEvent {
+    std::chrono::system_clock::time_point timestamp;
+    std::string phase;
+    int sender_id = -1;
+  };
+  std::vector<TimelineEvent> timeline_events;
+
   // Storage Engine Stats
   double ext_cache_hit_ratio_ = 0.0;
   std::string level_db_stats_;
@@ -111,6 +119,18 @@ struct TransactionTelemetry {
     json["level_db_stats"] = level_db_stats_;
     json["level_db_approx_mem_size"] = level_db_approx_mem_size_;
 
+    // Add timeline events
+    json["timeline_events"] = nlohmann::json::array();
+    for (const auto& event : timeline_events) {
+      nlohmann::json event_json;
+      event_json["timestamp"] = event.timestamp.time_since_epoch().count();
+      event_json["phase"] = event.phase;
+      if (event.sender_id >= 0) {
+        event_json["sender_id"] = event.sender_id;
+      }
+      json["timeline_events"].push_back(event_json);
+    }
+
     return json;
   }
 };
@@ -163,6 +183,10 @@ class Stats {
                                std::string level_db_approx_mem_size);
   void RecordStateTime(std::string state);
   void RecordStateTime(uint64_t seq, std::string state);
+  void RecordPrepareRecv(uint64_t seq, int sender_id);
+  void RecordCommitRecv(uint64_t seq, int sender_id);
+  void RecordExecuteStart(uint64_t seq);
+  void RecordExecuteEnd(uint64_t seq);
   void GetTransactionDetails(BatchUserRequest batch_request);
   void SendSummary();
   void SendSummary(uint64_t seq);

Reply via email to