This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit c77cc3ef8baa5e86af47eb3fd6ff4cb93e478ea6 Author: bchou9 <[email protected]> AuthorDate: Thu Jan 8 07:05:57 2026 +0000 timeline events addition + per sequence API --- .../consensus/execution/transaction_executor.cpp | 6 + platform/consensus/ordering/pbft/commitment.cpp | 2 + platform/statistic/stats.cpp | 175 ++++++++++++++++++++- platform/statistic/stats.h | 24 +++ 4 files changed, 206 insertions(+), 1 deletion(-) diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 3e30d3f6..34eb3e3e 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -257,6 +257,7 @@ void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) { // id:"<<request->proxy_id(); std::unique_ptr<BatchUserResponse> response; global_stats_->GetTransactionDetails(batch_request); + global_stats_->RecordExecuteStart(batch_request.seq()); if (transaction_manager_) { response = transaction_manager_->ExecuteBatchWithSeq(request->seq(), batch_request); @@ -264,6 +265,7 @@ void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) { // global_stats_->IncTotalRequest(batch_request.user_requests_size()); // global_stats_->IncExecuteDone(); + global_stats_->RecordExecuteEnd(batch_request.seq()); } void TransactionExecutor::Execute(std::unique_ptr<Request> request, @@ -303,6 +305,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, std::unique_ptr<BatchUserResponse> response; global_stats_->GetTransactionDetails(*batch_request_p); uint64_t seq = request->seq(); + // Timeline: execution start + global_stats_->RecordExecuteStart(seq); if (transaction_manager_ && need_execute) { if (execute_thread_num_ == 1) { response = transaction_manager_->ExecuteBatchWithSeq(request->seq(), @@ -366,6 +370,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, post_exec_func_(std::move(request), std::move(response)); } + // Timeline: execution end + global_stats_->RecordExecuteEnd(seq); global_stats_->IncExecuteDone(); } diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 7ebb769f..0f4a970d 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -265,6 +265,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, } uint64_t seq = request->seq(); int sender_id = request->sender_id(); + global_stats_->RecordPrepareRecv(seq, sender_id); global_stats_->IncPrepare(seq); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); @@ -309,6 +310,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } + global_stats_->RecordCommitRecv(seq, sender_id); global_stats_->IncCommit(seq); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 8957e733..dcda37dd 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -139,13 +139,64 @@ void Stats::CrowRoute() { "Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - // Send your response + // Send your response - includes all consensus history with timeline events { std::lock_guard<std::mutex> lock(consensus_history_mutex_); res.body = consensus_history_.dump(); } res.end(); }); + CROW_ROUTE(app, "/consensus_data/<int>") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res, int seq_number) { + LOG(ERROR) << "API: Get consensus data for sequence " << seq_number; + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", + "Content-Type, Authorization"); + + nlohmann::json result; + + // First check consensus_history_ + { + std::lock_guard<std::mutex> lock(consensus_history_mutex_); + std::string seq_str = std::to_string(seq_number); + if (consensus_history_.find(seq_str) != consensus_history_.end()) { + result = consensus_history_[seq_str]; + } + } + + // Also check live telemetry map (may have more recent data) + uint64_t seq = static_cast<uint64_t>(seq_number); + size_t shard = seq % 256; // kTelemetryShards + { + std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]); + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it != map.end()) { + // Merge live telemetry data (may be more complete) + nlohmann::json live_data = it->second.to_json(); + + // If we have data from consensus_history_, merge it + if (!result.empty()) { + // Prefer live data for most fields, but keep timeline events from both + result.update(live_data); + } else { + result = live_data; + } + } + } + + if (result.empty()) { + res.code = 404; + result["error"] = "Transaction not found"; + result["seq_number"] = seq_number; + } + + res.body = result.dump(); + res.end(); + }); CROW_ROUTE(app, "/get_status") .methods("GET"_method)([this](const crow::request& req, crow::response& res) { @@ -352,6 +403,128 @@ void Stats::RecordStateTime(uint64_t seq, std::string state) { } } +void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + // Initialize with static info + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + // Add timeline event + TransactionTelemetry::TimelineEvent event; + event.timestamp = std::chrono::system_clock::now(); + event.phase = "prepare_recv"; + event.sender_id = sender_id; + it->second.timeline_events.push_back(event); +} + +void Stats::RecordCommitRecv(uint64_t seq, int sender_id) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + // Initialize with static info + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + // Add timeline event + TransactionTelemetry::TimelineEvent event; + event.timestamp = std::chrono::system_clock::now(); + event.phase = "commit_recv"; + event.sender_id = sender_id; + it->second.timeline_events.push_back(event); +} + +void Stats::RecordExecuteStart(uint64_t seq) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + TransactionTelemetry::TimelineEvent event; + event.timestamp = std::chrono::system_clock::now(); + event.phase = "execute_start"; + it->second.timeline_events.push_back(event); +} + +void Stats::RecordExecuteEnd(uint64_t seq) { + if (!enable_resview) { + return; + } + + size_t shard = GetShardIndex(seq); + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it == map.end()) { + TransactionTelemetry telemetry; + telemetry.replica_id = static_telemetry_info_.replica_id; + telemetry.primary_id = static_telemetry_info_.primary_id; + telemetry.ip = static_telemetry_info_.ip; + telemetry.port = static_telemetry_info_.port; + telemetry.ext_cache_hit_ratio_ = static_telemetry_info_.ext_cache_hit_ratio_; + telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_; + telemetry.level_db_approx_mem_size_ = static_telemetry_info_.level_db_approx_mem_size_; + telemetry.txn_number = seq; + it = map.emplace(seq, std::move(telemetry)).first; + } + + TransactionTelemetry::TimelineEvent event; + event.timestamp = std::chrono::system_clock::now(); + event.phase = "execute_end"; + it->second.timeline_events.push_back(event); +} + void Stats::GetTransactionDetails(BatchUserRequest batch_request) { if (!enable_resview) { return; diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 4a0462bd..3f711ef9 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -65,6 +65,14 @@ struct TransactionTelemetry { commit_message_count_times_list; std::chrono::system_clock::time_point execution_time; + // Timeline events for detailed tracking + struct TimelineEvent { + std::chrono::system_clock::time_point timestamp; + std::string phase; + int sender_id = -1; + }; + std::vector<TimelineEvent> timeline_events; + // Storage Engine Stats double ext_cache_hit_ratio_ = 0.0; std::string level_db_stats_; @@ -111,6 +119,18 @@ struct TransactionTelemetry { json["level_db_stats"] = level_db_stats_; json["level_db_approx_mem_size"] = level_db_approx_mem_size_; + // Add timeline events + json["timeline_events"] = nlohmann::json::array(); + for (const auto& event : timeline_events) { + nlohmann::json event_json; + event_json["timestamp"] = event.timestamp.time_since_epoch().count(); + event_json["phase"] = event.phase; + if (event.sender_id >= 0) { + event_json["sender_id"] = event.sender_id; + } + json["timeline_events"].push_back(event_json); + } + return json; } }; @@ -163,6 +183,10 @@ class Stats { std::string level_db_approx_mem_size); void RecordStateTime(std::string state); void RecordStateTime(uint64_t seq, std::string state); + void RecordPrepareRecv(uint64_t seq, int sender_id); + void RecordCommitRecv(uint64_t seq, int sender_id); + void RecordExecuteStart(uint64_t seq); + void RecordExecuteEnd(uint64_t seq); void GetTransactionDetails(BatchUserRequest batch_request); void SendSummary(); void SendSummary(uint64_t seq);
