This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new 6d4cb97d per sequence telemetry tracking
6d4cb97d is described below
commit 6d4cb97dc209abf32b87f58ef2cf446483053c8e
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 06:34:27 2026 +0000
per sequence telemetry tracking
---
.../consensus/execution/transaction_executor.cpp | 2 +-
platform/consensus/ordering/pbft/commitment.cpp | 14 +-
platform/statistic/stats.cpp | 1444 ++++++++++++--------
platform/statistic/stats.h | 400 ++++--
4 files changed, 1118 insertions(+), 742 deletions(-)
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index bb7cf6be..3e30d3f6 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -205,7 +205,7 @@ void TransactionExecutor::OrderMessage() {
}
void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
- global_stats_->IncCommit();
+ global_stats_->IncCommit(message->seq());
message->set_commit_time(GetCurrentTime());
execute_queue_.Push(std::move(message));
}
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 5a970dd6..7ebb769f 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -135,7 +135,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
return -2;
}
- global_stats_->RecordStateTime("request");
+ global_stats_->RecordStateTime(*seq, "request");
user_request->set_type(Request::TYPE_PRE_PREPARE);
user_request->set_current_view(message_manager_->GetCurrentView());
@@ -229,7 +229,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
}
global_stats_->IncPropose();
- global_stats_->RecordStateTime("pre-prepare");
+ global_stats_->RecordStateTime(request->seq(), "pre-prepare");
std::unique_ptr<Request> prepare_request = resdb::NewRequest(
Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
prepare_request->clear_data();
@@ -263,9 +263,9 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
}
return ret;
}
- global_stats_->IncPrepare();
uint64_t seq = request->seq();
int sender_id = request->sender_id();
+ global_stats_->IncPrepare(seq);
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
commit_request->mutable_data_signature()->Clear();
@@ -289,7 +289,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
// LOG(ERROR) << "sign hash"
// << commit_request->data_signature().DebugString();
}
- global_stats_->RecordStateTime("prepare");
+ global_stats_->RecordStateTime(seq, "prepare");
replica_communicator_->BroadCast(*commit_request);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -309,7 +309,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
}
- global_stats_->IncCommit();
+ global_stats_->IncCommit(seq);
// Add request to message_manager.
// If it has received enough same requests(2f+1), message manager will
// commit the request.
@@ -318,7 +318,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
if (ret == CollectorResultCode::STATE_CHANGED) {
// LOG(ERROR)<<request->data().size();
// global_stats_->GetTransactionDetails(request->data());
- global_stats_->RecordStateTime("commit");
+ global_stats_->RecordStateTime(seq, "commit");
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
@@ -331,7 +331,7 @@ int Commitment::PostProcessExecutedMsg() {
if (batch_resp == nullptr) {
continue;
}
- global_stats_->SendSummary();
+ global_stats_->SendSummary(batch_resp->seq());
Request request;
request.set_hash(batch_resp->hash());
request.set_seq(batch_resp->seq());
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 1c511b3b..8957e733 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -17,589 +17,861 @@
* under the License.
*/
- #include "platform/statistic/stats.h"
-
- #include <glog/logging.h>
-
- #include <ctime>
-
- #include "common/utils/utils.h"
- #include "proto/kv/kv.pb.h"
-
- namespace asio = boost::asio;
- namespace beast = boost::beast;
- using tcp = asio::ip::tcp;
-
- namespace resdb {
-
- std::mutex g_mutex;
- Stats* Stats::GetGlobalStats(int seconds) {
- std::unique_lock<std::mutex> lk(g_mutex);
- static Stats stats(seconds);
- return &stats;
- } // gets a singelton instance of Stats Class
-
- Stats::Stats(int sleep_time) {
- monitor_sleep_time_ = sleep_time;
- #ifdef TEST_MODE
- monitor_sleep_time_ = 1;
- #endif
- num_call_ = 0;
- num_commit_ = 0;
- run_time_ = 0;
- run_call_ = 0;
- run_call_time_ = 0;
- server_call_ = 0;
- server_process_ = 0;
- run_req_num_ = 0;
- run_req_run_time_ = 0;
- seq_gap_ = 0;
- total_request_ = 0;
- total_geo_request_ = 0;
- geo_request_ = 0;
-
- stop_ = false;
- begin_ = false;
-
- socket_recv_ = 0;
- broad_cast_msg_ = 0;
- send_broad_cast_msg_ = 0;
-
- prometheus_ = nullptr;
- global_thread_ =
- std::thread(&Stats::MonitorGlobal, this); // pass by reference
-
- transaction_summary_.port = -1;
-
- // Setup websocket here
- make_faulty_.store(false);
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.txn_number = 0;
- }
-
- void Stats::Stop() { stop_ = true; }
-
- Stats::~Stats() {
- stop_ = true;
- if (global_thread_.joinable()) {
- global_thread_.join();
- }
- if (enable_resview && crow_thread_.joinable()) {
- crow_thread_.join();
- }
- }
-
- int64_t GetRSS() {
- int64_t rss = 0;
- FILE* fp = NULL;
- if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
- return 0;
- }
-
- uint64_t size, resident, share, text, lib, data, dt;
- if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share,
&text,
- &lib, &data, &dt) != 7) {
- fclose(fp);
- return 0;
- }
- fclose(fp);
-
- int64_t page_size = sysconf(_SC_PAGESIZE);
- rss = resident * page_size;
-
- // Convert to MB
- rss = rss / (1024 * 1024);
-
- return rss;
- }
-
- void Stats::CrowRoute() {
- crow::SimpleApp app;
- while (!stop_) {
- try {
- CROW_ROUTE(app, "/consensus_data")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 1";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- res.body = consensus_history_.dump();
- res.end();
- });
- CROW_ROUTE(app, "/get_status")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 2";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- res.body = IsFaulty() ? "Faulty" : "Not Faulty";
- res.end();
- });
- CROW_ROUTE(app, "/make_faulty")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 3";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- if (enable_faulty_switch_) {
- make_faulty_.store(!make_faulty_.load());
- }
- res.body = "Success";
- res.end();
- });
- CROW_ROUTE(app, "/transaction_data")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 4";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- nlohmann::json mem_view_json;
- int status =
- getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
- if (status == 0) {
- mem_view_json["resident_set_size"] = GetRSS();
- mem_view_json["max_resident_set_size"] =
- transaction_summary_.process_stats_.ru_maxrss;
- mem_view_json["num_reads"] =
- transaction_summary_.process_stats_.ru_inblock;
- mem_view_json["num_writes"] =
- transaction_summary_.process_stats_.ru_oublock;
- }
-
- mem_view_json["ext_cache_hit_ratio"] =
- transaction_summary_.ext_cache_hit_ratio_;
- mem_view_json["level_db_stats"] =
- transaction_summary_.level_db_stats_;
- mem_view_json["level_db_approx_mem_size"] =
- transaction_summary_.level_db_approx_mem_size_;
- res.body = mem_view_json.dump();
- mem_view_json.clear();
- res.end();
- });
- app.port(8500 + transaction_summary_.port).multithreaded().run();
- sleep(1);
- } catch (const std::exception& e) {
- }
- }
- app.stop();
- }
-
- bool Stats::IsFaulty() { return make_faulty_.load(); }
-
- void Stats::ChangePrimary(int primary_id) {
- transaction_summary_.primary_id = primary_id;
- make_faulty_.store(false);
- }
-
- void Stats::SetProps(int replica_id, std::string ip, int port,
- bool resview_flag, bool faulty_flag) {
- transaction_summary_.replica_id = replica_id;
- transaction_summary_.ip = ip;
- transaction_summary_.port = port;
- enable_resview = resview_flag;
- enable_faulty_switch_ = faulty_flag;
- if (resview_flag) {
- crow_thread_ = std::thread(&Stats::CrowRoute, this);
- }
- }
-
- void Stats::SetPrimaryId(int primary_id) {
- transaction_summary_.primary_id = primary_id;
- }
-
- void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
- std::string level_db_stats,
- std::string level_db_approx_mem_size) {
- transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
- transaction_summary_.level_db_stats_ = level_db_stats;
- transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
- }
-
- void Stats::RecordStateTime(std::string state) {
- if (!enable_resview) {
- return;
- }
- if (state == "request" || state == "pre-prepare") {
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::now();
- } else if (state == "prepare") {
- transaction_summary_.prepare_state_time =
std::chrono::system_clock::now();
- } else if (state == "commit") {
- transaction_summary_.commit_state_time = std::chrono::system_clock::now();
- }
- }
-
- void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
- if (!enable_resview) {
- return;
- }
- transaction_summary_.txn_number = batch_request.seq();
- transaction_summary_.txn_command.clear();
- transaction_summary_.txn_key.clear();
- transaction_summary_.txn_value.clear();
- for (auto& sub_request : batch_request.user_requests()) {
- KVRequest kv_request;
- if (!kv_request.ParseFromString(sub_request.request().data())) {
- break;
- }
- if (kv_request.cmd() == KVRequest::SET) {
- transaction_summary_.txn_command.push_back("SET");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back(kv_request.value());
- } else if (kv_request.cmd() == KVRequest::GET) {
- transaction_summary_.txn_command.push_back("GET");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back("");
- } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
- transaction_summary_.txn_command.push_back("GETALLVALUES");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back("");
- } else if (kv_request.cmd() == KVRequest::GETRANGE) {
- transaction_summary_.txn_command.push_back("GETRANGE");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back(kv_request.value());
- }
- }
- }
-
- void Stats::SendSummary() {
- if (!enable_resview) {
- return;
- }
- transaction_summary_.execution_time = std::chrono::system_clock::now();
-
- // Convert Transaction Summary to JSON
- summary_json_["replica_id"] = transaction_summary_.replica_id;
- summary_json_["ip"] = transaction_summary_.ip;
- summary_json_["port"] = transaction_summary_.port;
- summary_json_["primary_id"] = transaction_summary_.primary_id;
- summary_json_["propose_pre_prepare_time"] =
- transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
- .count();
- summary_json_["prepare_time"] =
- transaction_summary_.prepare_state_time.time_since_epoch().count();
- summary_json_["commit_time"] =
- transaction_summary_.commit_state_time.time_since_epoch().count();
- summary_json_["execution_time"] =
- transaction_summary_.execution_time.time_since_epoch().count();
- for (size_t i = 0;
- i < transaction_summary_.prepare_message_count_times_list.size(); i++)
{
- summary_json_["prepare_message_timestamps"].push_back(
- transaction_summary_.prepare_message_count_times_list[i]
- .time_since_epoch()
- .count());
- }
- for (size_t i = 0;
- i < transaction_summary_.commit_message_count_times_list.size(); i++) {
- summary_json_["commit_message_timestamps"].push_back(
- transaction_summary_.commit_message_count_times_list[i]
- .time_since_epoch()
- .count());
- }
- summary_json_["txn_number"] = transaction_summary_.txn_number;
- for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
- summary_json_["txn_commands"].push_back(
- transaction_summary_.txn_command[i]);
- }
- for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
- summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
- }
- for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
- summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
- }
-
- summary_json_["ext_cache_hit_ratio"] =
- transaction_summary_.ext_cache_hit_ratio_;
- consensus_history_[std::to_string(transaction_summary_.txn_number)] =
- summary_json_;
-
- LOG(ERROR) << summary_json_.dump();
-
- // Reset Transaction Summary Parameters
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_message_count_times_list.clear();
- transaction_summary_.commit_message_count_times_list.clear();
-
- summary_json_.clear();
- }
-
- void Stats::MonitorGlobal() {
- LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
-
- uint64_t seq_fail = 0;
- uint64_t client_call = 0, socket_recv = 0;
- uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit =
0,
- pending_execute = 0, execute = 0, execute_done = 0;
- uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
- uint64_t send_broad_cast_msg_per_rep = 0;
- uint64_t server_call = 0, server_process = 0;
- uint64_t seq_gap = 0;
- uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
-
- // ====== for client proxy ======
- uint64_t run_req_num = 0, run_req_run_time = 0;
-
- uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
- // =============================
-
- uint64_t last_seq_fail = 0;
- uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare =
0,
- last_num_commit = 0;
- uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
- uint64_t last_client_call = 0, last_socket_recv = 0;
- uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
- uint64_t last_send_broad_cast_msg_per_rep = 0;
- uint64_t last_server_call = 0, last_server_process = 0;
- uint64_t last_total_request = 0, last_total_geo_request = 0,
- last_geo_request = 0;
- uint64_t time = 0;
-
- while (!stop_) {
- sleep(monitor_sleep_time_);
- time += monitor_sleep_time_;
- seq_fail = seq_fail_;
- socket_recv = socket_recv_;
- client_call = client_call_;
- num_client_req = num_client_req_;
- num_propose = num_propose_;
- num_prepare = num_prepare_;
- num_commit = num_commit_;
- pending_execute = pending_execute_;
- execute = execute_;
- execute_done = execute_done_;
- broad_cast_msg = broad_cast_msg_;
- send_broad_cast_msg = send_broad_cast_msg_;
- send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
- server_call = server_call_;
- server_process = server_process_;
- seq_gap = seq_gap_;
- total_request = total_request_;
- total_geo_request = total_geo_request_;
- geo_request = geo_request_;
-
- run_req_num = run_req_num_;
- run_req_run_time = run_req_run_time_;
-
- LOG(ERROR) << "=========== monitor =========\n"
- << "server call:" << server_call - last_server_call
- << " server process:" << server_process - last_server_process
- << " socket recv:" << socket_recv - last_socket_recv
- << " "
- "client call:"
- << client_call - last_client_call
- << " "
- "client req:"
- << num_client_req - last_num_client_req
- << " "
- "broad_cast:"
- << broad_cast_msg - last_broad_cast_msg
- << " "
- "send broad_cast:"
- << send_broad_cast_msg - last_send_broad_cast_msg
- << " "
- "per send broad_cast:"
- << send_broad_cast_msg_per_rep -
last_send_broad_cast_msg_per_rep
- << " "
- "propose:"
- << num_propose - last_num_propose
- << " "
- "prepare:"
- << (num_prepare - last_num_prepare)
- << " "
- "commit:"
- << (num_commit - last_num_commit)
- << " "
- "pending execute:"
- << pending_execute - last_pending_execute
- << " "
- "execute:"
- << execute - last_execute
- << " "
- "execute done:"
- << execute_done - last_execute_done << " seq gap:" << seq_gap
- << " total request:" << total_request - last_total_request
- << " txn:" << (total_request - last_total_request) / 5
- << " total geo request:"
- << total_geo_request - last_total_geo_request
- << " total geo request per:"
- << (total_geo_request - last_total_geo_request) / 5
- << " geo request:" << (geo_request - last_geo_request)
- << " "
- "seq fail:"
- << seq_fail - last_seq_fail << " time:" << time
- << " "
- "\n--------------- monitor ------------";
- if (run_req_num - last_run_req_num > 0) {
- LOG(ERROR) << " req client latency:"
- << static_cast<double>(run_req_run_time -
- last_run_req_run_time) /
- (run_req_num - last_run_req_num) / 1000000000.0;
- }
-
- last_seq_fail = seq_fail;
- last_socket_recv = socket_recv;
- last_client_call = client_call;
- last_num_client_req = num_client_req;
- last_num_propose = num_propose;
- last_num_prepare = num_prepare;
- last_num_commit = num_commit;
- last_pending_execute = pending_execute;
- last_execute = execute;
- last_execute_done = execute_done;
-
- last_broad_cast_msg = broad_cast_msg;
- last_send_broad_cast_msg = send_broad_cast_msg;
- last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
-
- last_server_call = server_call;
- last_server_process = server_process;
-
- last_run_req_num = run_req_num;
- last_run_req_run_time = run_req_run_time;
- last_total_request = total_request;
- last_total_geo_request = total_geo_request;
- last_geo_request = geo_request;
- }
- }
-
- void Stats::IncClientCall() {
- if (prometheus_) {
- prometheus_->Inc(CLIENT_CALL, 1);
- }
- client_call_++;
- }
-
- void Stats::IncClientRequest() {
- if (prometheus_) {
- prometheus_->Inc(CLIENT_REQ, 1);
- }
- num_client_req_++;
- }
-
- void Stats::IncPropose() {
- if (prometheus_) {
- prometheus_->Inc(PROPOSE, 1);
- }
- num_propose_++;
- }
-
- void Stats::IncPrepare() {
- if (prometheus_) {
- prometheus_->Inc(PREPARE, 1);
- }
- num_prepare_++;
- transaction_summary_.prepare_message_count_times_list.push_back(
- std::chrono::system_clock::now());
- }
-
- void Stats::IncCommit() {
- if (prometheus_) {
- prometheus_->Inc(COMMIT, 1);
- }
- num_commit_++;
- transaction_summary_.commit_message_count_times_list.push_back(
- std::chrono::system_clock::now());
- }
-
- void Stats::IncPendingExecute() { pending_execute_++; }
-
- void Stats::IncExecute() { execute_++; }
-
- void Stats::IncExecuteDone() {
- if (prometheus_) {
- prometheus_->Inc(EXECUTE, 1);
- }
- execute_done_++;
- }
-
- void Stats::BroadCastMsg() {
- if (prometheus_) {
- prometheus_->Inc(BROAD_CAST, 1);
- }
- broad_cast_msg_++;
- }
-
- void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
-
- void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
-
- void Stats::SeqFail() { seq_fail_++; }
-
- void Stats::IncTotalRequest(uint32_t num) {
- if (prometheus_) {
- prometheus_->Inc(NUM_EXECUTE_TX, num);
- }
- total_request_ += num;
- }
-
- void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
-
- void Stats::IncGeoRequest() { geo_request_++; }
-
- void Stats::ServerCall() {
- if (prometheus_) {
- prometheus_->Inc(SERVER_CALL_NAME, 1);
- }
- server_call_++;
- }
-
- void Stats::ServerProcess() {
- if (prometheus_) {
- prometheus_->Inc(SERVER_PROCESS, 1);
- }
- server_process_++;
- }
-
- void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
-
- void Stats::AddLatency(uint64_t run_time) {
- run_req_num_++;
- run_req_run_time_ += run_time;
- }
-
- void Stats::SetPrometheus(const std::string& prometheus_address) {
- prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
- }
-
- } // namespace resdb
\ No newline at end of file
+#include "platform/statistic/stats.h"
+
+#include <glog/logging.h>
+
+#include <ctime>
+
+#include "common/utils/utils.h"
+#include "proto/kv/kv.pb.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
+
+namespace resdb {
+
+std::mutex g_mutex;
+Stats* Stats::GetGlobalStats(int seconds) {
+ std::unique_lock<std::mutex> lk(g_mutex);
+ static Stats stats(seconds);
+ return &stats;
+} // gets a singelton instance of Stats Class
+
+Stats::Stats(int sleep_time) {
+ monitor_sleep_time_ = sleep_time;
+#ifdef TEST_MODE
+ monitor_sleep_time_ = 1;
+#endif
+ num_call_ = 0;
+ num_commit_ = 0;
+ run_time_ = 0;
+ run_call_ = 0;
+ run_call_time_ = 0;
+ server_call_ = 0;
+ server_process_ = 0;
+ run_req_num_ = 0;
+ run_req_run_time_ = 0;
+ seq_gap_ = 0;
+ total_request_ = 0;
+ total_geo_request_ = 0;
+ geo_request_ = 0;
+
+ stop_ = false;
+ begin_ = false;
+
+ socket_recv_ = 0;
+ broad_cast_msg_ = 0;
+ send_broad_cast_msg_ = 0;
+
+ prometheus_ = nullptr;
+ global_thread_ =
+ std::thread(&Stats::MonitorGlobal, this); // pass by reference
+
+ transaction_summary_.port = -1;
+
+ // Setup websocket here
+ make_faulty_.store(false);
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.txn_number = 0;
+
+ // Initialize static telemetry info
+ static_telemetry_info_.port = -1;
+}
+
+void Stats::Stop() { stop_ = true; }
+
+Stats::~Stats() {
+ stop_ = true;
+ if (global_thread_.joinable()) {
+ global_thread_.join();
+ }
+ if (enable_resview && crow_thread_.joinable()) {
+ crow_thread_.join();
+ }
+}
+
+int64_t GetRSS() {
+ int64_t rss = 0;
+ FILE* fp = NULL;
+ if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
+ return 0;
+ }
+
+ uint64_t size, resident, share, text, lib, data, dt;
+ if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share,
&text,
+ &lib, &data, &dt) != 7) {
+ fclose(fp);
+ return 0;
+ }
+ fclose(fp);
+
+ int64_t page_size = sysconf(_SC_PAGESIZE);
+ rss = resident * page_size;
+
+ // Convert to MB
+ rss = rss / (1024 * 1024);
+
+ return rss;
+}
+
+void Stats::CrowRoute() {
+ crow::SimpleApp app;
+ while (!stop_) {
+ try {
+ CROW_ROUTE(app, "/consensus_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ {
+ std::lock_guard<std::mutex> lock(consensus_history_mutex_);
+ res.body = consensus_history_.dump();
+ }
+ res.end();
+ });
+ CROW_ROUTE(app, "/get_status")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 2";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+ res.end();
+ });
+ CROW_ROUTE(app, "/make_faulty")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 3";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ if (enable_faulty_switch_) {
+ make_faulty_.store(!make_faulty_.load());
+ }
+ res.body = "Success";
+ res.end();
+ });
+ CROW_ROUTE(app, "/transaction_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 4";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ nlohmann::json mem_view_json;
+ int status =
+ getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
+ if (status == 0) {
+ mem_view_json["resident_set_size"] = GetRSS();
+ mem_view_json["max_resident_set_size"] =
+ transaction_summary_.process_stats_.ru_maxrss;
+ mem_view_json["num_reads"] =
+ transaction_summary_.process_stats_.ru_inblock;
+ mem_view_json["num_writes"] =
+ transaction_summary_.process_stats_.ru_oublock;
+ }
+
+ mem_view_json["ext_cache_hit_ratio"] =
+ transaction_summary_.ext_cache_hit_ratio_;
+ mem_view_json["level_db_stats"] =
+ transaction_summary_.level_db_stats_;
+ mem_view_json["level_db_approx_mem_size"] =
+ transaction_summary_.level_db_approx_mem_size_;
+ res.body = mem_view_json.dump();
+ mem_view_json.clear();
+ res.end();
+ });
+ app.port(8500 + transaction_summary_.port).multithreaded().run();
+ sleep(1);
+ } catch (const std::exception& e) {
+ }
+ }
+ app.stop();
+}
+
+bool Stats::IsFaulty() { return make_faulty_.load(); }
+
+void Stats::ChangePrimary(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
+ make_faulty_.store(false);
+}
+
+void Stats::SetProps(int replica_id, std::string ip, int port,
+ bool resview_flag, bool faulty_flag) {
+ transaction_summary_.replica_id = replica_id;
+ transaction_summary_.ip = ip;
+ transaction_summary_.port = port;
+ // Also set static telemetry info for new transactions
+ static_telemetry_info_.replica_id = replica_id;
+ static_telemetry_info_.ip = ip;
+ static_telemetry_info_.port = port;
+ enable_resview = resview_flag;
+ enable_faulty_switch_ = faulty_flag;
+ if (resview_flag) {
+ crow_thread_ = std::thread(&Stats::CrowRoute, this);
+ }
+}
+
+void Stats::SetPrimaryId(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
+ static_telemetry_info_.primary_id = primary_id;
+}
+
+void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
+ std::string level_db_stats,
+ std::string level_db_approx_mem_size) {
+ transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+ transaction_summary_.level_db_stats_ = level_db_stats;
+ transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+ static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+ static_telemetry_info_.level_db_stats_ = level_db_stats;
+ static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+}
+
+size_t Stats::GetShardIndex(uint64_t seq) const {
+ return seq % kTelemetryShards;
+}
+
+TransactionTelemetry& Stats::GetOrCreateTelemetry(uint64_t seq) {
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ // Initialize with static info - only copy static fields, preserve
+ // timestamps as min
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
+ static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
+ static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ // Timestamps remain at min() which is correct
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+ return it->second;
+}
+
+void Stats::RecordStateTime(std::string state) {
+ // Old method without seq - deprecated, but keep for backwards compatibility
+ // This uses the old single transaction_summary_ approach
+ if (!enable_resview) {
+ return;
+ }
+ // Temporary fix: protect access to transaction_summary_
+ std::lock_guard<std::mutex> lock(
+ telemetry_mutex_[0]); // Use first shard for old code
+ if (state == "request" || state == "pre-prepare") {
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::now();
+ } else if (state == "prepare") {
+ transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
+ } else if (state == "commit") {
+ transaction_summary_.commit_state_time = std::chrono::system_clock::now();
+ }
+}
+
+void Stats::RecordStateTime(uint64_t seq, std::string state) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ // Initialize with static info - DO NOT reset existing entry
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
+ static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
+ static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ // Timestamps start at min() - correct
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ // Entry exists - update timestamp only, don't touch other fields
+ auto& telemetry = it->second;
+ auto now = std::chrono::system_clock::now();
+
+ if (state == "request" || state == "pre-prepare") {
+ telemetry.request_pre_prepare_state_time = now;
+ } else if (state == "prepare") {
+ telemetry.prepare_state_time = now;
+ } else if (state == "commit") {
+ telemetry.commit_state_time = now;
+ }
+}
+
+void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+ if (!enable_resview) {
+ return;
+ }
+
+ uint64_t seq = batch_request.seq();
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ bool entry_existed = (it != map.end());
+
+ if (!entry_existed) {
+ // Initialize with static info - timestamps start at min() which is correct
+ // They will be set by RecordStateTime() calls later
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
+ static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
+ static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ // Timestamps remain at min() - will be updated by RecordStateTime() if
+ // needed
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ // Entry exists (either just created or already existed)
+ // CRITICAL: Only update transaction details fields, NEVER touch timestamp
+ // fields
+ auto& telemetry = it->second;
+ telemetry.txn_number = seq;
+
+ // Only update transaction detail fields - timestamps are NEVER modified here
+ telemetry.txn_command.clear();
+ telemetry.txn_key.clear();
+ telemetry.txn_value.clear();
+
+ for (auto& sub_request : batch_request.user_requests()) {
+ KVRequest kv_request;
+ if (!kv_request.ParseFromString(sub_request.request().data())) {
+ break;
+ }
+
+ if (kv_request.cmd() == KVRequest::SET) {
+ telemetry.txn_command.push_back("SET");
+ telemetry.txn_key.push_back(kv_request.key());
+ telemetry.txn_value.push_back(kv_request.value());
+ } else if (kv_request.cmd() == KVRequest::GET) {
+ telemetry.txn_command.push_back("GET");
+ telemetry.txn_key.push_back(kv_request.key());
+ telemetry.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+ telemetry.txn_command.push_back("GETALLVALUES");
+ telemetry.txn_key.push_back(kv_request.key());
+ telemetry.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+ telemetry.txn_command.push_back("GETRANGE");
+ telemetry.txn_key.push_back(kv_request.key());
+ telemetry.txn_value.push_back(kv_request.value());
+ }
+ }
+
+ // Timestamps (request_pre_prepare_state_time, prepare_state_time,
+ // commit_state_time, execution_time, prepare_message_count_times_list,
+ // commit_message_count_times_list) are NEVER modified in this function -
they
+ // are only set by RecordStateTime(), IncPrepare(), IncCommit(), and
+ // SendSummary()
+}
+
+void Stats::SendSummary() {
+ // Old method without seq - deprecated, but keep for backwards compatibility
+ // This uses the old single transaction_summary_ approach
+ if (!enable_resview) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(
+ telemetry_mutex_[0]); // Use first shard for old code
+
+ transaction_summary_.execution_time = std::chrono::system_clock::now();
+
+ // Convert Transaction Summary to JSON (old manual way)
+ summary_json_["replica_id"] = transaction_summary_.replica_id;
+ summary_json_["ip"] = transaction_summary_.ip;
+ summary_json_["port"] = transaction_summary_.port;
+ summary_json_["primary_id"] = transaction_summary_.primary_id;
+ summary_json_["propose_pre_prepare_time"] =
+ transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+ .count();
+ summary_json_["prepare_time"] =
+ transaction_summary_.prepare_state_time.time_since_epoch().count();
+ summary_json_["commit_time"] =
+ transaction_summary_.commit_state_time.time_since_epoch().count();
+ summary_json_["execution_time"] =
+ transaction_summary_.execution_time.time_since_epoch().count();
+ for (size_t i = 0;
+ i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
+ summary_json_["prepare_message_timestamps"].push_back(
+ transaction_summary_.prepare_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
+ }
+ for (size_t i = 0;
+ i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+ summary_json_["commit_message_timestamps"].push_back(
+ transaction_summary_.commit_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
+ }
+ summary_json_["txn_number"] = transaction_summary_.txn_number;
+ for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+ summary_json_["txn_commands"].push_back(
+ transaction_summary_.txn_command[i]);
+ }
+ for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
+ summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+ }
+ for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
+ summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+ }
+
+ summary_json_["ext_cache_hit_ratio"] =
+ transaction_summary_.ext_cache_hit_ratio_;
+
+ {
+ std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
+ consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+ summary_json_;
+ }
+
+ LOG(ERROR) << summary_json_.dump();
+
+ // Reset Transaction Summary Parameters
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_message_count_times_list.clear();
+ transaction_summary_.commit_message_count_times_list.clear();
+
+ summary_json_.clear();
+}
+
+void Stats::SendSummary(uint64_t seq) {
+ if (!enable_resview) {
+ return;
+ }
+
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ LOG(WARNING) << "SendSummary called for unknown transaction: " << seq;
+ return;
+ }
+
+ auto& telemetry = it->second;
+
+ // CRITICAL: Set execution_time before serialization, but don't modify any
+ // other fields
+ telemetry.execution_time = std::chrono::system_clock::now();
+
+ // Use automatic JSON serialization - this reads current state, doesn't
modify
+ // it
+ nlohmann::json summary_json = telemetry.to_json();
+
+ {
+ std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
+ // Only update if we don't already have this sequence, or update existing
+ // This prevents overwriting good data with bad data
+ std::string seq_str = std::to_string(seq);
+ if (consensus_history_.find(seq_str) == consensus_history_.end()) {
+ // First time storing - always store
+ consensus_history_[seq_str] = summary_json;
+ } else {
+ // Already exists - only update if new data has more complete information
+ // (i.e., has transaction details or more timestamps)
+ auto& existing = consensus_history_[seq_str];
+ bool new_has_details = !summary_json["txn_commands"].empty();
+ bool existing_has_details = !existing["txn_commands"].empty();
+
+ // If new data has details and existing doesn't, merge them
+ if (new_has_details && !existing_has_details) {
+ existing["txn_commands"] = summary_json["txn_commands"];
+ existing["txn_keys"] = summary_json["txn_keys"];
+ existing["txn_values"] = summary_json["txn_values"];
+ }
+
+ // Always update execution_time as it's the latest
+ existing["execution_time"] = summary_json["execution_time"];
+ }
+ }
+
+ LOG(ERROR) << summary_json.dump();
+
+ // DON'T delete the entry - per-sequence state means each seq has its own
+ // entry and it will naturally be reused or cleaned up later if needed
+ // Deleting causes issues if SendSummary() is called multiple times or
+ // if telemetry updates happen after sending
+}
+
+void Stats::MonitorGlobal() {
+ LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
+
+ uint64_t seq_fail = 0;
+ uint64_t client_call = 0, socket_recv = 0;
+ uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit =
0,
+ pending_execute = 0, execute = 0, execute_done = 0;
+ uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
+ uint64_t send_broad_cast_msg_per_rep = 0;
+ uint64_t server_call = 0, server_process = 0;
+ uint64_t seq_gap = 0;
+ uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
+
+ // ====== for client proxy ======
+ uint64_t run_req_num = 0, run_req_run_time = 0;
+
+ uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
+ // =============================
+
+ uint64_t last_seq_fail = 0;
+ uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
+ last_num_commit = 0;
+ uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
+ uint64_t last_client_call = 0, last_socket_recv = 0;
+ uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
+ uint64_t last_send_broad_cast_msg_per_rep = 0;
+ uint64_t last_server_call = 0, last_server_process = 0;
+ uint64_t last_total_request = 0, last_total_geo_request = 0,
+ last_geo_request = 0;
+ uint64_t time = 0;
+
+ while (!stop_) {
+ sleep(monitor_sleep_time_);
+ time += monitor_sleep_time_;
+ seq_fail = seq_fail_;
+ socket_recv = socket_recv_;
+ client_call = client_call_;
+ num_client_req = num_client_req_;
+ num_propose = num_propose_;
+ num_prepare = num_prepare_;
+ num_commit = num_commit_;
+ pending_execute = pending_execute_;
+ execute = execute_;
+ execute_done = execute_done_;
+ broad_cast_msg = broad_cast_msg_;
+ send_broad_cast_msg = send_broad_cast_msg_;
+ send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
+ server_call = server_call_;
+ server_process = server_process_;
+ seq_gap = seq_gap_;
+ total_request = total_request_;
+ total_geo_request = total_geo_request_;
+ geo_request = geo_request_;
+
+ run_req_num = run_req_num_;
+ run_req_run_time = run_req_run_time_;
+
+ LOG(ERROR) << "=========== monitor =========\n"
+ << "server call:" << server_call - last_server_call
+ << " server process:" << server_process - last_server_process
+ << " socket recv:" << socket_recv - last_socket_recv
+ << " "
+ "client call:"
+ << client_call - last_client_call
+ << " "
+ "client req:"
+ << num_client_req - last_num_client_req
+ << " "
+ "broad_cast:"
+ << broad_cast_msg - last_broad_cast_msg
+ << " "
+ "send broad_cast:"
+ << send_broad_cast_msg - last_send_broad_cast_msg
+ << " "
+ "per send broad_cast:"
+ << send_broad_cast_msg_per_rep -
last_send_broad_cast_msg_per_rep
+ << " "
+ "propose:"
+ << num_propose - last_num_propose
+ << " "
+ "prepare:"
+ << (num_prepare - last_num_prepare)
+ << " "
+ "commit:"
+ << (num_commit - last_num_commit)
+ << " "
+ "pending execute:"
+ << pending_execute - last_pending_execute
+ << " "
+ "execute:"
+ << execute - last_execute
+ << " "
+ "execute done:"
+ << execute_done - last_execute_done << " seq gap:" << seq_gap
+ << " total request:" << total_request - last_total_request
+ << " txn:" << (total_request - last_total_request) / 5
+ << " total geo request:"
+ << total_geo_request - last_total_geo_request
+ << " total geo request per:"
+ << (total_geo_request - last_total_geo_request) / 5
+ << " geo request:" << (geo_request - last_geo_request)
+ << " "
+ "seq fail:"
+ << seq_fail - last_seq_fail << " time:" << time
+ << " "
+ "\n--------------- monitor ------------";
+ if (run_req_num - last_run_req_num > 0) {
+ LOG(ERROR) << " req client latency:"
+ << static_cast<double>(run_req_run_time -
+ last_run_req_run_time) /
+ (run_req_num - last_run_req_num) / 1000000000.0;
+ }
+
+ last_seq_fail = seq_fail;
+ last_socket_recv = socket_recv;
+ last_client_call = client_call;
+ last_num_client_req = num_client_req;
+ last_num_propose = num_propose;
+ last_num_prepare = num_prepare;
+ last_num_commit = num_commit;
+ last_pending_execute = pending_execute;
+ last_execute = execute;
+ last_execute_done = execute_done;
+
+ last_broad_cast_msg = broad_cast_msg;
+ last_send_broad_cast_msg = send_broad_cast_msg;
+ last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
+
+ last_server_call = server_call;
+ last_server_process = server_process;
+
+ last_run_req_num = run_req_num;
+ last_run_req_run_time = run_req_run_time;
+ last_total_request = total_request;
+ last_total_geo_request = total_geo_request;
+ last_geo_request = geo_request;
+ }
+}
+
+void Stats::IncClientCall() {
+ if (prometheus_) {
+ prometheus_->Inc(CLIENT_CALL, 1);
+ }
+ client_call_++;
+}
+
+void Stats::IncClientRequest() {
+ if (prometheus_) {
+ prometheus_->Inc(CLIENT_REQ, 1);
+ }
+ num_client_req_++;
+}
+
+void Stats::IncPropose() {
+ if (prometheus_) {
+ prometheus_->Inc(PROPOSE, 1);
+ }
+ num_propose_++;
+}
+
+void Stats::IncPrepare() {
+ if (prometheus_) {
+ prometheus_->Inc(PREPARE, 1);
+ }
+ num_prepare_++;
+ // Old method without seq - deprecated
+ if (enable_resview) {
+ std::lock_guard<std::mutex> lock(
+ telemetry_mutex_[0]); // Use first shard for old code
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+}
+
+void Stats::IncPrepare(uint64_t seq) {
+ if (prometheus_) {
+ prometheus_->Inc(PREPARE, 1);
+ }
+ num_prepare_++;
+ if (enable_resview) {
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
+ static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
+ static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ it->second.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+}
+
+void Stats::IncCommit() {
+ if (prometheus_) {
+ prometheus_->Inc(COMMIT, 1);
+ }
+ num_commit_++;
+ // Old method without seq - deprecated
+ if (enable_resview) {
+ std::lock_guard<std::mutex> lock(
+ telemetry_mutex_[0]); // Use first shard for old code
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+}
+
+void Stats::IncCommit(uint64_t seq) {
+ if (prometheus_) {
+ prometheus_->Inc(COMMIT, 1);
+ }
+ num_commit_++;
+ if (enable_resview) {
+ size_t shard = GetShardIndex(seq);
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it == map.end()) {
+ TransactionTelemetry telemetry;
+ telemetry.replica_id = static_telemetry_info_.replica_id;
+ telemetry.primary_id = static_telemetry_info_.primary_id;
+ telemetry.ip = static_telemetry_info_.ip;
+ telemetry.port = static_telemetry_info_.port;
+ telemetry.ext_cache_hit_ratio_ =
+ static_telemetry_info_.ext_cache_hit_ratio_;
+ telemetry.level_db_stats_ = static_telemetry_info_.level_db_stats_;
+ telemetry.level_db_approx_mem_size_ =
+ static_telemetry_info_.level_db_approx_mem_size_;
+ telemetry.txn_number = seq;
+ it = map.emplace(seq, std::move(telemetry)).first;
+ }
+
+ it->second.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+}
+
+void Stats::IncPendingExecute() { pending_execute_++; }
+
+void Stats::IncExecute() { execute_++; }
+
+void Stats::IncExecuteDone() {
+ if (prometheus_) {
+ prometheus_->Inc(EXECUTE, 1);
+ }
+ execute_done_++;
+}
+
+void Stats::BroadCastMsg() {
+ if (prometheus_) {
+ prometheus_->Inc(BROAD_CAST, 1);
+ }
+ broad_cast_msg_++;
+}
+
+void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+
+void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+
+void Stats::SeqFail() { seq_fail_++; }
+
+void Stats::IncTotalRequest(uint32_t num) {
+ if (prometheus_) {
+ prometheus_->Inc(NUM_EXECUTE_TX, num);
+ }
+ total_request_ += num;
+}
+
+void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+
+void Stats::IncGeoRequest() { geo_request_++; }
+
+void Stats::ServerCall() {
+ if (prometheus_) {
+ prometheus_->Inc(SERVER_CALL_NAME, 1);
+ }
+ server_call_++;
+}
+
+void Stats::ServerProcess() {
+ if (prometheus_) {
+ prometheus_->Inc(SERVER_PROCESS, 1);
+ }
+ server_process_++;
+}
+
+void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+
+void Stats::AddLatency(uint64_t run_time) {
+ run_req_num_++;
+ run_req_run_time_ += run_time;
+}
+
+void Stats::SetPrometheus(const std::string& prometheus_address) {
+ prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
+}
+
+} // namespace resdb
\ No newline at end of file
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index c0ad30a7..4a0462bd 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -17,151 +17,255 @@
* under the License.
*/
- #pragma once
-
- #include <crow.h>
-
- #include <chrono>
- #include <future>
- #include <nlohmann/json.hpp>
-
- #include "boost/asio.hpp"
- #include "boost/beast.hpp"
- #include "platform/common/network/tcp_socket.h"
- #include "platform/proto/resdb.pb.h"
- #include "platform/statistic/prometheus_handler.h"
- #include "proto/kv/kv.pb.h"
- #include "sys/resource.h"
-
- namespace asio = boost::asio;
- namespace beast = boost::beast;
- using tcp = asio::ip::tcp;
-
- namespace resdb {
-
- struct VisualData {
- // Set when initializing
- int replica_id;
- int primary_id;
- std::string ip;
- int port;
-
- // Set when new txn is received
- int txn_number;
- std::vector<std::string> txn_command;
- std::vector<std::string> txn_key;
- std::vector<std::string> txn_value;
-
- // Request state if primary_id==replica_id, pre_prepare state otherwise
- std::chrono::system_clock::time_point request_pre_prepare_state_time;
- std::chrono::system_clock::time_point prepare_state_time;
- std::vector<std::chrono::system_clock::time_point>
- prepare_message_count_times_list;
- std::chrono::system_clock::time_point commit_state_time;
- std::vector<std::chrono::system_clock::time_point>
- commit_message_count_times_list;
- std::chrono::system_clock::time_point execution_time;
-
- // Storage Engine Stats
- double ext_cache_hit_ratio_;
- std::string level_db_stats_;
- std::string level_db_approx_mem_size_;
-
- // process stats
- struct rusage process_stats_;
- };
-
- class Stats {
- public:
- static Stats* GetGlobalStats(int sleep_seconds = 5);
-
- void Stop();
-
- void RetrieveProgress();
- void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
- bool faulty_flag);
- void SetPrimaryId(int primary_id);
- void SetStorageEngineMetrics(double ext_cache_hit_ratio,
- std::string level_db_stats,
- std::string level_db_approx_mem_size);
- void RecordStateTime(std::string state);
- void GetTransactionDetails(BatchUserRequest batch_request);
- void SendSummary();
- void CrowRoute();
- bool IsFaulty();
- void ChangePrimary(int primary_id);
-
- void AddLatency(uint64_t run_time);
-
- void Monitor();
- void MonitorGlobal();
-
- void IncSocketRecv();
-
- void IncClientCall();
-
- void IncClientRequest();
- void IncPropose();
- void IncPrepare();
- void IncCommit();
- void IncPendingExecute();
- void IncExecute();
- void IncExecuteDone();
-
- void BroadCastMsg();
- void SendBroadCastMsg(uint32_t num);
- void SendBroadCastMsgPerRep();
- void SeqFail();
- void IncTotalRequest(uint32_t num);
- void IncTotalGeoRequest(uint32_t num);
- void IncGeoRequest();
-
- void SeqGap(uint64_t seq_gap);
- // Network in->worker
- void ServerCall();
- void ServerProcess();
- void SetPrometheus(const std::string& prometheus_address);
-
- protected:
- Stats(int sleep_time = 5);
- ~Stats();
-
- private:
- std::string monitor_port_ = "default";
- std::string name_;
- std::atomic<int> num_call_, run_call_;
- std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
- std::thread thread_;
- std::atomic<bool> begin_;
- std::atomic<bool> stop_;
- std::condition_variable cv_;
- std::mutex mutex_;
-
- std::thread global_thread_;
- std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
- num_commit_, pending_execute_, execute_, execute_done_;
- std::atomic<uint64_t> client_call_, socket_recv_;
- std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
- send_broad_cast_msg_per_rep_;
- std::atomic<uint64_t> seq_fail_;
- std::atomic<uint64_t> server_call_, server_process_;
- std::atomic<uint64_t> run_req_num_;
- std::atomic<uint64_t> run_req_run_time_;
- std::atomic<uint64_t> seq_gap_;
- std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
- int monitor_sleep_time_ = 5; // default 5s.
-
- std::thread crow_thread_;
- bool enable_resview;
- bool enable_faulty_switch_;
- VisualData transaction_summary_;
- std::atomic<bool> make_faulty_;
- std::atomic<uint64_t> prev_num_prepare_;
- std::atomic<uint64_t> prev_num_commit_;
- nlohmann::json summary_json_;
- nlohmann::json consensus_history_;
-
- std::unique_ptr<PrometheusHandler> prometheus_;
- };
-
- } // namespace resdb
\ No newline at end of file
+#pragma once
+
+#include <crow.h>
+
+#include <chrono>
+#include <future>
+#include <mutex>
+#include <nlohmann/json.hpp>
+#include <unordered_map>
+
+#include "boost/asio.hpp"
+#include "boost/beast.hpp"
+#include "platform/common/network/tcp_socket.h"
+#include "platform/proto/resdb.pb.h"
+#include "platform/statistic/prometheus_handler.h"
+#include "proto/kv/kv.pb.h"
+#include "sys/resource.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
+
+namespace resdb {
+
+// Per-transaction telemetry data with automatic JSON serialization
+struct TransactionTelemetry {
+ // Static info (set once, copied to each transaction)
+ int replica_id = 0;
+ int primary_id = 0;
+ std::string ip;
+ int port = -1;
+
+ // Transaction-specific data
+ uint64_t txn_number = 0;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
+
+ // Timestamps
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
+
+ // Storage Engine Stats
+ double ext_cache_hit_ratio_ = 0.0;
+ std::string level_db_stats_;
+ std::string level_db_approx_mem_size_;
+
+ // Default constructor with min timestamps
+ TransactionTelemetry()
+ : request_pre_prepare_state_time(
+ std::chrono::system_clock::time_point::min()),
+ prepare_state_time(std::chrono::system_clock::time_point::min()),
+ commit_state_time(std::chrono::system_clock::time_point::min()),
+ execution_time(std::chrono::system_clock::time_point::min()) {}
+
+ // Convert to JSON
+ nlohmann::json to_json() const {
+ nlohmann::json json;
+ json["replica_id"] = replica_id;
+ json["ip"] = ip;
+ json["port"] = port;
+ json["primary_id"] = primary_id;
+ json["propose_pre_prepare_time"] =
+ request_pre_prepare_state_time.time_since_epoch().count();
+ json["prepare_time"] = prepare_state_time.time_since_epoch().count();
+ json["commit_time"] = commit_state_time.time_since_epoch().count();
+ json["execution_time"] = execution_time.time_since_epoch().count();
+
+ json["prepare_message_timestamps"] = nlohmann::json::array();
+ for (const auto& ts : prepare_message_count_times_list) {
+ json["prepare_message_timestamps"].push_back(
+ ts.time_since_epoch().count());
+ }
+
+ json["commit_message_timestamps"] = nlohmann::json::array();
+ for (const auto& ts : commit_message_count_times_list) {
+ json["commit_message_timestamps"].push_back(
+ ts.time_since_epoch().count());
+ }
+
+ json["txn_number"] = txn_number;
+ json["txn_commands"] = txn_command;
+ json["txn_keys"] = txn_key;
+ json["txn_values"] = txn_value;
+ json["ext_cache_hit_ratio"] = ext_cache_hit_ratio_;
+ json["level_db_stats"] = level_db_stats_;
+ json["level_db_approx_mem_size"] = level_db_approx_mem_size_;
+
+ return json;
+ }
+};
+
+// Keep old VisualData for backwards compatibility if needed
+struct VisualData {
+ // Set when initializing
+ int replica_id;
+ int primary_id;
+ std::string ip;
+ int port;
+
+ // Set when new txn is received
+ int txn_number;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
+
+ // Request state if primary_id==replica_id, pre_prepare state otherwise
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
+
+ // Storage Engine Stats
+ double ext_cache_hit_ratio_;
+ std::string level_db_stats_;
+ std::string level_db_approx_mem_size_;
+
+ // process stats
+ struct rusage process_stats_;
+};
+
+class Stats {
+ public:
+ static Stats* GetGlobalStats(int sleep_seconds = 5);
+
+ void Stop();
+
+ void RetrieveProgress();
+ void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+ bool faulty_flag);
+ void SetPrimaryId(int primary_id);
+ void SetStorageEngineMetrics(double ext_cache_hit_ratio,
+ std::string level_db_stats,
+ std::string level_db_approx_mem_size);
+ void RecordStateTime(std::string state);
+ void RecordStateTime(uint64_t seq, std::string state);
+ void GetTransactionDetails(BatchUserRequest batch_request);
+ void SendSummary();
+ void SendSummary(uint64_t seq);
+ void CrowRoute();
+ bool IsFaulty();
+ void ChangePrimary(int primary_id);
+
+ void AddLatency(uint64_t run_time);
+
+ void Monitor();
+ void MonitorGlobal();
+
+ void IncSocketRecv();
+
+ void IncClientCall();
+
+ void IncClientRequest();
+ void IncPropose();
+ void IncPrepare();
+ void IncPrepare(uint64_t seq);
+ void IncCommit();
+ void IncCommit(uint64_t seq);
+ void IncPendingExecute();
+ void IncExecute();
+ void IncExecuteDone();
+
+ void BroadCastMsg();
+ void SendBroadCastMsg(uint32_t num);
+ void SendBroadCastMsgPerRep();
+ void SeqFail();
+ void IncTotalRequest(uint32_t num);
+ void IncTotalGeoRequest(uint32_t num);
+ void IncGeoRequest();
+
+ void SeqGap(uint64_t seq_gap);
+ // Network in->worker
+ void ServerCall();
+ void ServerProcess();
+ void SetPrometheus(const std::string& prometheus_address);
+
+ protected:
+ Stats(int sleep_time = 5);
+ ~Stats();
+
+ private:
+ std::string monitor_port_ = "default";
+ std::string name_;
+ std::atomic<int> num_call_, run_call_;
+ std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
+ std::thread thread_;
+ std::atomic<bool> begin_;
+ std::atomic<bool> stop_;
+ std::condition_variable cv_;
+ std::mutex mutex_;
+
+ std::thread global_thread_;
+ std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
+ num_commit_, pending_execute_, execute_, execute_done_;
+ std::atomic<uint64_t> client_call_, socket_recv_;
+ std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
+ send_broad_cast_msg_per_rep_;
+ std::atomic<uint64_t> seq_fail_;
+ std::atomic<uint64_t> server_call_, server_process_;
+ std::atomic<uint64_t> run_req_num_;
+ std::atomic<uint64_t> run_req_run_time_;
+ std::atomic<uint64_t> seq_gap_;
+ std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
+ int monitor_sleep_time_ = 5; // default 5s.
+
+ std::thread crow_thread_;
+ bool enable_resview;
+ bool enable_faulty_switch_;
+
+ // Old single transaction_summary_ for backwards compatibility
+ // TODO: Remove after full migration to sharded map
+ VisualData transaction_summary_;
+
+ // Sharded map solution for per-transaction telemetry
+ static const int kTelemetryShards = 256;
+ std::unordered_map<uint64_t, TransactionTelemetry>
+ transaction_telemetry_map_[kTelemetryShards];
+ std::mutex telemetry_mutex_[kTelemetryShards];
+ TransactionTelemetry
+ static_telemetry_info_; // Static info to copy to new transactions
+ mutable std::unordered_set<uint64_t>
+ finalized_sequences_; // Sequences that have been sent (don't create new
+ // entries)
+ mutable std::mutex finalized_mutex_; // Protect finalized_sequences_
+
+ // Helper to get shard index
+ size_t GetShardIndex(uint64_t seq) const;
+ TransactionTelemetry& GetOrCreateTelemetry(uint64_t seq);
+ bool IsFinalized(uint64_t seq) const;
+ void MarkFinalized(uint64_t seq);
+
+ std::atomic<bool> make_faulty_;
+ std::atomic<uint64_t> prev_num_prepare_;
+ std::atomic<uint64_t> prev_num_commit_;
+ nlohmann::json summary_json_;
+ nlohmann::json consensus_history_;
+ std::mutex consensus_history_mutex_; // Protect consensus_history_ access
+
+ std::unique_ptr<PrometheusHandler> prometheus_;
+};
+
+} // namespace resdb
\ No newline at end of file