This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new a3330963 Added back stats changes
a3330963 is described below
commit a3330963e64aee95646e8643e166e91cf6927f02
Author: bchou9 <[email protected]>
AuthorDate: Wed Jan 7 20:25:19 2026 +0000
Added back stats changes
---
.gitignore | 3 +-
.../consensus/execution/transaction_executor.cpp | 4 +
platform/consensus/ordering/pbft/commitment.cpp | 10 +-
platform/networkstrate/service_network.cpp | 12 +
platform/statistic/BUILD | 5 +
platform/statistic/stats.cpp | 403 +++++++++++++++++++--
platform/statistic/stats.h | 28 +-
7 files changed, 438 insertions(+), 27 deletions(-)
diff --git a/.gitignore b/.gitignore
index be0ae77a..d4b8448d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,4 +21,5 @@ resdb/
100*_db/
gmon.out
.history/
-*_db/
\ No newline at end of file
+*_db/
+resdb_data_*/*
\ No newline at end of file
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 0a7401bc..71c228a0 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -302,6 +302,8 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
+ uint64_t seq = request->seq();
+ global_stats_->RecordExecuteStart(seq);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -354,6 +356,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
+ global_stats_->RecordExecuteEnd(seq);
global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
response->set_proxy_id(batch_request_p->proxy_id());
response->set_createtime(batch_request_p->createtime() +
@@ -363,6 +366,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
response->set_seq(request->seq());
if (post_exec_func_) {
post_exec_func_(std::move(request), std::move(response));
+ global_stats_->RecordResponseSent(seq);
}
global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 463d0161..f8ab3d9d 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -263,14 +263,16 @@ int
Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context,
}
return ret;
}
- // global_stats_->IncPrepare();
+ global_stats_->IncPrepare();
+ uint64_t seq = request->seq();
+ int sender_id = request->sender_id();
+ global_stats_->RecordPrepareRecv(seq, sender_id);
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
commit_request->mutable_data_signature()->Clear();
// Add request to message_manager.
// If it has received enough same requests(2f+1), broadcast the commit
// message.
- uint64_t seq = request->seq();
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature,
std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
@@ -303,11 +305,13 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
return -2;
}
uint64_t seq = request->seq();
+ int sender_id = request->sender_id();
if (request->is_recovery()) {
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
}
- // global_stats_->IncCommit();
+ global_stats_->IncCommit();
+ global_stats_->RecordCommitRecv(seq, sender_id);
// Add request to message_manager.
// If it has received enough same requests(2f+1), message manager will
// commit the request.
diff --git a/platform/networkstrate/service_network.cpp
b/platform/networkstrate/service_network.cpp
index 1e69fd25..0bc60579 100644
--- a/platform/networkstrate/service_network.cpp
+++ b/platform/networkstrate/service_network.cpp
@@ -26,6 +26,7 @@
#include "platform/common/network/tcp_socket.h"
#include "platform/proto/broadcast.pb.h"
+#include "platform/proto/resdb.pb.h"
namespace resdb {
@@ -73,6 +74,17 @@ void ServiceNetwork::AcceptorHandler(const char* buffer,
size_t data_len) {
item->data = std::move(sub_request_info);
// LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data
// len:"<<item->data->data_len;
+
+ // Try to extract seq from request for timeline tracking
+ try {
+ Request request;
+ if (request.ParseFromArray(sub_data.data(), sub_data.size())) {
+ global_stats_->RecordNetworkRecv(request.seq());
+ }
+ } catch (...) {
+ // Ignore parse errors, seq extraction is optional
+ }
+
global_stats_->ServerCall();
input_queue_.Push(std::move(item));
}
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 92328f16..042668e1 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,6 +26,10 @@ cc_library(
name = "stats",
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
+ copts = select({
+ "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
+ "//conditions:default": [],
+ }),
deps = [
":prometheus_handler",
"//common:asio",
@@ -38,6 +42,7 @@ cc_library(
"//proto/kv:kv_cc_proto",
"//third_party:crow",
"//third_party:prometheus",
+ "//third_party:leveldb",
],
)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 21f1524f..5b4e3aec 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -22,8 +22,15 @@
#include <glog/logging.h>
#include <ctime>
+#include <sstream>
+#include <fstream>
+#include <sys/stat.h>
+#include <errno.h>
#include "common/utils/utils.h"
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/write_batch.h"
#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
@@ -124,22 +131,99 @@ void Stats::CrowRoute() {
crow::SimpleApp app;
while (!stop_) {
try {
+ //Deprecated
CROW_ROUTE(app, "/consensus_data")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 1";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- res.body = consensus_history_.dump();
- res.end();
- });
+ .methods("GET"_method)(
+ [this](const crow::request& req, crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin", "*");
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS");
+ res.set_header("Access-Control-Allow-Headers",
+ "Content-Type, Authorization");
+
+ int limit = 10;
+ int page = 1;
+
+ std::string url = req.url;
+ size_t query_pos = url.find('?');
+ if (query_pos != std::string::npos) {
+ std::string query_string = url.substr(query_pos + 1);
+ std::istringstream iss(query_string);
+ std::string param;
+ while (std::getline(iss, param, '&')) {
+ size_t eq_pos = param.find('=');
+ if (eq_pos != std::string::npos) {
+ std::string key = param.substr(0, eq_pos);
+ std::string value = param.substr(eq_pos + 1);
+ if (key == "limit") {
+ try {
+ limit = std::stoi(value);
+ if (limit <= 0) limit = 10;
+ } catch (...) {
+ limit = 10;
+ }
+ } else if (key == "page") {
+ try {
+ page = std::stoi(value);
+ if (page <= 0) page = 1;
+ } catch (...) {
+ page = 1;
+ }
+ }
+ }
+ }
+ }
+
+ nlohmann::json result;
+ int total_count = 0;
+ int offset = (page - 1) * limit;
+ int current_index = 0;
+ int items_collected = 0;
+
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ leveldb::Iterator* it =
+ summary_db_->NewIterator(leveldb::ReadOptions());
+
+ // Count only non-timeline entries
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::string key = it->key().ToString();
+ if (key.find("timeline_") != 0) {
+ total_count++;
+ }
+ }
+
+ // Iterate and collect only non-timeline entries
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::string key = it->key().ToString();
+ // Skip timeline entries
+ if (key.find("timeline_") == 0) {
+ continue;
+ }
+
+ if (current_index >= offset && items_collected < limit) {
+ try {
+ result[key] =
+ nlohmann::json::parse(it->value().ToString());
+ items_collected++;
+ } catch (...) {
+ res.code = 500;
+ result["error"] = "Failed to parse transaction data";
+ delete it;
+ res.body = result.dump();
+ res.end();
+ return;
+ }
+ }
+ current_index++;
+ }
+ delete it;
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
CROW_ROUTE(app, "/get_status")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
@@ -168,11 +252,12 @@ void Stats::CrowRoute() {
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
+ res.body = "Not Enabled";
// Send your response
if (enable_faulty_switch_) {
make_faulty_.store(!make_faulty_.load());
+ res.body = "Success";
}
- res.body = "Success";
res.end();
});
CROW_ROUTE(app, "/transaction_data")
@@ -210,6 +295,126 @@ void Stats::CrowRoute() {
mem_view_json.clear();
res.end();
});
+ CROW_ROUTE(app, "/consensus_data/<int>")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res, int txn_number) {
+ LOG(ERROR) << "API 5: Get transaction " << txn_number;
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ nlohmann::json result;
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ std::string txn_key = std::to_string(txn_number);
+ std::string value;
+ leveldb::Status status =
+ summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
+
+ if (status.ok() && !value.empty()) {
+ try {
+ result[txn_key] = nlohmann::json::parse(value);
+ } catch (...) {
+ res.code = 500;
+ result["error"] = "Failed to parse transaction data";
+ }
+ } else {
+ res.code = 404;
+ result["error"] = "Transaction not found";
+ result["txn_number"] = txn_number;
+ }
+ } else {
+ res.code = 503;
+ result["error"] = "Storage not initialized";
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
+
+ CROW_ROUTE(app, "/transaction_timeline/<int>")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res, int txn_number) {
+ LOG(ERROR) << "API 6: Get transaction timeline " << txn_number;
+ res.set_header("Access-Control-Allow-Origin", "*");
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS");
+ res.set_header("Access-Control-Allow-Headers",
+ "Content-Type, Authorization");
+
+ nlohmann::json result;
+ result["txn_number"] = txn_number;
+
+ if (!summary_db_) {
+ res.code = 503;
+ result["error"] = "Storage not initialized";
+ res.body = result.dump();
+ res.end();
+ return;
+ }
+
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+ std::string txn_key = std::to_string(txn_number);
+ std::string timeline_key = "timeline_" + txn_key;
+
+ std::string txn_value;
+ leveldb::Status status =
+ summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value);
+
+ if (status.ok() && !txn_value.empty()) {
+ try {
+ nlohmann::json txn_summary = nlohmann::json::parse(txn_value);
+ nlohmann::json txn_details;
+ if (txn_summary.contains("txn_commands")) {
+ txn_details["txn_commands"] = txn_summary["txn_commands"];
+ }
+ if (txn_summary.contains("txn_keys")) {
+ txn_details["txn_keys"] = txn_summary["txn_keys"];
+ }
+ if (txn_summary.contains("txn_values")) {
+ txn_details["txn_values"] = txn_summary["txn_values"];
+ }
+ if (txn_summary.contains("propose_pre_prepare_time")) {
+ txn_details["propose_pre_prepare_time"] =
+ txn_summary["propose_pre_prepare_time"];
+ }
+ if (txn_summary.contains("prepare_time")) {
+ txn_details["prepare_time"] = txn_summary["prepare_time"];
+ }
+ if (txn_summary.contains("commit_time")) {
+ txn_details["commit_time"] = txn_summary["commit_time"];
+ }
+ if (txn_summary.contains("execution_time")) {
+ txn_details["execution_time"] =
txn_summary["execution_time"];
+ }
+ result["transaction_details"] = txn_details;
+ } catch (...) {
+ LOG(ERROR) << "Failed to parse transaction summary for txn: "
+ << txn_number;
+ }
+ }
+
+ std::string timeline_value;
+ status = summary_db_->Get(leveldb::ReadOptions(), timeline_key,
&timeline_value);
+
+ if (status.ok() && !timeline_value.empty()) {
+ try {
+ nlohmann::json events_array =
nlohmann::json::parse(timeline_value);
+ result["timeline"] = events_array;
+ } catch (...) {
+ LOG(ERROR) << "Failed to parse timeline for txn: " <<
txn_number;
+ }
+ } else {
+ LOG(ERROR) << "Timeline not found for txn: " << txn_number;
+ }
+
+ res.body = result.dump();
+ res.end();
+ });
app.port(8500 + transaction_summary_.port).multithreaded().run();
sleep(1);
} catch (const std::exception& e) {
@@ -233,6 +438,27 @@ void Stats::SetProps(int replica_id, std::string ip, int
port,
enable_resview = resview_flag;
enable_faulty_switch_ = faulty_flag;
if (resview_flag) {
+ // Single data directory for both summaries and timeline
+ std::string data_dir = "./resdb_data_" + std::to_string(port);
+ int mkdir_result = mkdir(data_dir.c_str(), 0755);
+ if (mkdir_result != 0 && errno != EEXIST) {
+ LOG(ERROR) << "Failed to create data directory: " << data_dir;
+ }
+
+ // Initialize LevelDB for both transaction summaries and timeline events
+ std::string summary_path = data_dir + "/summaries";
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::DB* db = nullptr;
+ leveldb::Status status = leveldb::DB::Open(options, summary_path, &db);
+ if (status.ok()) {
+ summary_db_.reset(db);
+ LOG(INFO) << "Initialized LevelDB storage at: " << summary_path;
+ } else {
+ LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": "
+ << status.ToString();
+ }
+
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
}
@@ -344,8 +570,63 @@ void Stats::SendSummary() {
summary_json_["ext_cache_hit_ratio"] =
transaction_summary_.ext_cache_hit_ratio_;
- consensus_history_[std::to_string(transaction_summary_.txn_number)] =
- summary_json_;
+
+ std::string txn_key = std::to_string(transaction_summary_.txn_number);
+
+ if (summary_db_) {
+ std::lock_guard<std::mutex> lock(summary_db_mutex_);
+
+ // Write transaction summary
+ leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
+ summary_json_.dump());
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
+ << ": " << status.ToString();
+ }
+
+ // Batch write all buffered timeline events for this transaction
+ {
+ std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
+ auto it = timeline_buffer_.find(transaction_summary_.txn_number);
+ if (it != timeline_buffer_.end() && !it->second.empty()) {
+ std::string timeline_key = "timeline_" + txn_key;
+
+ // Read existing timeline events if they exist, then merge
+ nlohmann::json events_array;
+ std::string existing_value;
+ leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(),
timeline_key, &existing_value);
+ if (read_status.ok() && !existing_value.empty()) {
+ try {
+ events_array = nlohmann::json::parse(existing_value);
+ // Ensure it's an array
+ if (!events_array.is_array()) {
+ events_array = nlohmann::json::array();
+ }
+ } catch (...) {
+ // If parsing fails, start with empty array
+ events_array = nlohmann::json::array();
+ }
+ } else {
+ events_array = nlohmann::json::array();
+ }
+
+ // Append new events to existing array
+ for (const auto& event : it->second) {
+ events_array.push_back(event);
+ }
+
+ // Write merged timeline events back to LevelDB
+ status = summary_db_->Put(leveldb::WriteOptions(), timeline_key,
+ events_array.dump());
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to write timeline to storage for txn: " <<
txn_key
+ << ": " << status.ToString();
+ }
+ // Clean up buffer for this txn
+ timeline_buffer_.erase(it);
+ }
+ }
+ }
LOG(ERROR) << summary_json_.dump();
@@ -364,6 +645,76 @@ void Stats::SendSummary() {
summary_json_.clear();
}
+void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int
sender_id) {
+ if (!enable_resview) {
+ return;
+ }
+
+ // Buffer timeline events in memory - written to LevelDB in batch during
SendSummary()
+ std::lock_guard<std::mutex> lock(timeline_buffer_mutex_);
+
+ // Create new event
+ nlohmann::json event;
+ event["timestamp"] =
std::chrono::system_clock::now().time_since_epoch().count();
+ event["phase"] = phase;
+ if (sender_id >= 0) {
+ event["sender_id"] = sender_id;
+ }
+
+ // Append to in-memory buffer for this seq
+ timeline_buffer_[seq].push_back(event);
+}
+
+void Stats::RecordNetworkRecv(uint64_t seq) {
+ WriteTimelineEvent(seq, "network_recv");
+}
+
+void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
+ WriteTimelineEvent(seq, "prepare_recv", sender_id);
+}
+
+void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
+ WriteTimelineEvent(seq, "commit_recv", sender_id);
+}
+
+void Stats::RecordExecuteStart(uint64_t seq) {
+ WriteTimelineEvent(seq, "execute_start");
+}
+
+void Stats::RecordExecuteEnd(uint64_t seq) {
+ WriteTimelineEvent(seq, "execute_end");
+}
+
+void Stats::RecordResponseSent(uint64_t seq) {
+ WriteTimelineEvent(seq, "response_sent");
+}
+
+void Stats::CleanupOldTimelineEntries() {
+ // Periodically clean up old buffer entries to prevent memory leaks
+ // Keep only entries within the last 10000 sequence numbers
+ std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
+
+ if (timeline_buffer_.empty()) {
+ return;
+ }
+
+ // Get the highest sequence number we've seen
+ uint64_t max_seq = timeline_buffer_.rbegin()->first;
+ uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0;
+
+ // Remove all entries below the threshold
+ auto it = timeline_buffer_.begin();
+ while (it != timeline_buffer_.end() && it->first < cleanup_threshold) {
+ it = timeline_buffer_.erase(it);
+ }
+
+ size_t buffer_size = timeline_buffer_.size();
+ if (buffer_size > 1000) {
+ LOG(WARNING) << "Timeline buffer size is large: " << buffer_size
+ << " entries. This may indicate transactions not completing.";
+ }
+}
+
void Stats::MonitorGlobal() {
LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
@@ -398,6 +749,10 @@ void Stats::MonitorGlobal() {
while (!stop_) {
sleep(monitor_sleep_time_);
time += monitor_sleep_time_;
+
+ // Periodically clean up old timeline buffer entries to prevent memory
leaks
+ CleanupOldTimelineEntries();
+
seq_fail = seq_fail_;
socket_recv = socket_recv_;
client_call = client_call_;
@@ -529,8 +884,10 @@ void Stats::IncPrepare() {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
- transaction_summary_.prepare_message_count_times_list.push_back(
- std::chrono::system_clock::now());
+ if (enable_resview) {
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
}
void Stats::IncCommit() {
@@ -538,8 +895,10 @@ void Stats::IncCommit() {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
- transaction_summary_.commit_message_count_times_list.push_back(
- std::chrono::system_clock::now());
+ if (enable_resview) {
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
}
void Stats::IncPendingExecute() { pending_execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 849c83d1..a2524fc1 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -23,6 +23,7 @@
#include <chrono>
#include <future>
+#include <map>
#include <nlohmann/json.hpp>
#include "boost/asio.hpp"
@@ -33,6 +34,11 @@
#include "proto/kv/kv.pb.h"
#include "sys/resource.h"
+// Forward declaration for LevelDB
+namespace leveldb {
+class DB;
+}
+
namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;
@@ -91,6 +97,15 @@ class Stats {
bool IsFaulty();
void ChangePrimary(int primary_id);
+ // Timeline event recording (disk-based, only when resview enabled)
+ void RecordNetworkRecv(uint64_t seq);
+ void RecordPrepareRecv(uint64_t seq, int sender_id);
+ void RecordCommitRecv(uint64_t seq, int sender_id);
+ void RecordExecuteStart(uint64_t seq);
+ void RecordExecuteEnd(uint64_t seq);
+ void RecordResponseSent(uint64_t seq);
+ void CleanupOldTimelineEntries(); // Prevent timeline buffer memory leak
+
void AddLatency(uint64_t run_time);
void Monitor();
@@ -159,9 +174,20 @@ class Stats {
std::atomic<uint64_t> prev_num_prepare_;
std::atomic<uint64_t> prev_num_commit_;
nlohmann::json summary_json_;
- nlohmann::json consensus_history_;
std::unique_ptr<PrometheusHandler> prometheus_;
+
+ std::unique_ptr<leveldb::DB> summary_db_;
+ std::mutex summary_db_mutex_;
+
+ // In-memory timeline event buffer (per-seq) - batched writes to LevelDB
+ std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_;
+ std::mutex timeline_buffer_mutex_;
+
+ // Timeline directory for per-transaction event logs
+ std::string timeline_dir_;
+ std::mutex timeline_mutex_;
+ void WriteTimelineEvent(uint64_t seq, const std::string& phase, int
sender_id = -1);
};
} // namespace resdb