This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new c77cc3ef timeline events addition + per sequence API
c77cc3ef is described below
commit c77cc3ef8baa5e86af47eb3fd6ff4cb93e478ea6
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 07:05:57 2026 +0000
timeline events addition + per sequence API
---
.../consensus/execution/transaction_executor.cpp | 6 +
platform/consensus/ordering/pbft/commitment.cpp | 2 +
platform/statistic/stats.cpp | 175 ++++++++++++++++++++-
platform/statistic/stats.h | 24 +++
4 files changed, 206 insertions(+), 1 deletion(-)
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 3e30d3f6..34eb3e3e 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -257,6 +257,7 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// id:"<<request->proxy_id();
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
+ global_stats_->RecordExecuteStart(batch_request.seq());
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
batch_request);
@@ -264,6 +265,7 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// global_stats_->IncTotalRequest(batch_request.user_requests_size());
// global_stats_->IncExecuteDone();
+ global_stats_->RecordExecuteEnd(batch_request.seq());
}
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
@@ -303,6 +305,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
uint64_t seq = request->seq();
+ // Timeline: execution start
+ global_stats_->RecordExecuteStart(seq);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -366,6 +370,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
post_exec_func_(std::move(request), std::move(response));
}
+ // Timeline: execution end
+ global_stats_->RecordExecuteEnd(seq);
global_stats_->IncExecuteDone();
}
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 7ebb769f..0f4a970d 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -265,6 +265,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
}
uint64_t seq = request->seq();
int sender_id = request->sender_id();
+ global_stats_->RecordPrepareRecv(seq, sender_id);
global_stats_->IncPrepare(seq);
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
@@ -309,6 +310,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
}
+ global_stats_->RecordCommitRecv(seq, sender_id);
global_stats_->IncCommit(seq);
// Add request to message_manager.
// If it has received enough same requests(2f+1), message manager will
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 8957e733..dcda37dd 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -139,13 +139,64 @@ void Stats::CrowRoute() {
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
- // Send your response
+ // Send your response - includes all consensus history with
timeline events
{
std::lock_guard<std::mutex> lock(consensus_history_mutex_);
res.body = consensus_history_.dump();
}
res.end();
});
+ CROW_ROUTE(app, "/consensus_data/<int>")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res, int seq_number) {
+ LOG(ERROR) << "API: Get consensus data for sequence " <<
seq_number;
+ res.set_header("Access-Control-Allow-Origin", "*");
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS");
+ res.set_header("Access-Control-Allow-Headers",
+ "Content-Type, Authorization");
+
+ nlohmann::json result;
+
+ // First check consensus_history_
+ {
+ std::lock_guard<std::mutex> lock(consensus_history_mutex_);
+ std::string seq_str = std::to_string(seq_number);
+ if (consensus_history_.find(seq_str) !=
consensus_history_.end()) {
+ result = consensus_history_[seq_str];
+ }
+ }
+
+ // Also check live telemetry map (may have more recent data)
+ uint64_t seq = static_cast<uint64_t>(seq_number);
+ size_t shard = seq % 256; // kTelemetryShards
+ {
+ std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]);
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it != map.end()) {
+ // Merge live telemetry data (may be more complete)
+ nlohmann::json live_data = it->second.to_json();
+
+ // If we have data from consensus_history_, merge it
+ if (!result.empty()) {
+ // Prefer live data for most fields, but keep timeline
events from both
+ result.update(live_data);
+ } else {
+ result = live_data;
+ }
+ }
+ }
+
+ if (result.empty()) {
+ res.code = 404;
+ result["error"] = "Transaction not found";
+ result["seq_number"] = seq_number;
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
CROW_ROUTE(app, "/get_status")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
@@ -352,6 +403,128 @@ void Stats::RecordStateTime(uint64_t seq, std::string
state) {
}
}
+void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ // Initialize with static info
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ // Add timeline event
+ TransactionTelemetry::TimelineEvent event;
+ event.timestamp = std::chrono::system_clock::now();
+ event.phase = "prepare_recv";
+ event.sender_id = sender_id;
+ it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ // Initialize with static info
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ // Add timeline event
+ TransactionTelemetry::TimelineEvent event;
+ event.timestamp = std::chrono::system_clock::now();
+ event.phase = "commit_recv";
+ event.sender_id = sender_id;
+ it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordExecuteStart(uint64_t seq) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ TransactionTelemetry::TimelineEvent event;
+ event.timestamp = std::chrono::system_clock::now();
+ event.phase = "execute_start";
+ it->second.timeline_events.push_back(event);
+}
+
+void Stats::RecordExecuteEnd(uint64_t seq) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ TransactionTelemetry::TimelineEvent event;
+ event.timestamp = std::chrono::system_clock::now();
+ event.phase = "execute_end";
+ it->second.timeline_events.push_back(event);
+}
+
void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
if (!enable_resview) {
return;
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 4a0462bd..3f711ef9 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -65,6 +65,14 @@ struct TransactionTelemetry {
commit_message_count_times_list;
std::chrono::system_clock::time_point execution_time;
+ // Timeline events for detailed tracking
+ struct TimelineEvent {
+ std::chrono::system_clock::time_point timestamp;
+ std::string phase;
+ int sender_id = -1;
+ };
+ std::vector<TimelineEvent> timeline_events;
+
// Storage Engine Stats
double ext_cache_hit_ratio_ = 0.0;
std::string level_db_stats_;
@@ -111,6 +119,18 @@ struct TransactionTelemetry {
json["level_db_stats"] = level_db_stats_;
json["level_db_approx_mem_size"] = level_db_approx_mem_size_;
+ // Add timeline events
+ json["timeline_events"] = nlohmann::json::array();
+ for (const auto& event : timeline_events) {
+ nlohmann::json event_json;
+ event_json["timestamp"] = event.timestamp.time_since_epoch().count();
+ event_json["phase"] = event.phase;
+ if (event.sender_id >= 0) {
+ event_json["sender_id"] = event.sender_id;
+ }
+ json["timeline_events"].push_back(event_json);
+ }
+
return json;
}
};
@@ -163,6 +183,10 @@ class Stats {
std::string level_db_approx_mem_size);
void RecordStateTime(std::string state);
void RecordStateTime(uint64_t seq, std::string state);
+ void RecordPrepareRecv(uint64_t seq, int sender_id);
+ void RecordCommitRecv(uint64_t seq, int sender_id);
+ void RecordExecuteStart(uint64_t seq);
+ void RecordExecuteEnd(uint64_t seq);
void GetTransactionDetails(BatchUserRequest batch_request);
void SendSummary();
void SendSummary(uint64_t seq);