This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new a3c1477e reverting stats.cpp to base implemenation. TODO: per request
telemetry tracking, currently gets overwritten leading to incorrect telemetry
data
a3c1477e is described below
commit a3c1477e3aa84fffb6b6d2e509a5dd40cb498d31
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 05:12:34 2026 +0000
reverting stats.cpp to base implemenation. TODO: per request telemetry
tracking, currently gets overwritten leading to incorrect telemetry data
---
.../consensus/execution/transaction_executor.cpp | 3 -
platform/consensus/ordering/pbft/commitment.cpp | 2 -
platform/networkstrate/service_network.cpp | 11 -
platform/statistic/BUILD | 7 +-
platform/statistic/stats.cpp | 1531 ++++++++------------
platform/statistic/stats.h | 322 ++--
6 files changed, 735 insertions(+), 1141 deletions(-)
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 71c228a0..bb7cf6be 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -303,7 +303,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
uint64_t seq = request->seq();
- global_stats_->RecordExecuteStart(seq);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -356,7 +355,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
- global_stats_->RecordExecuteEnd(seq);
global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
response->set_proxy_id(batch_request_p->proxy_id());
response->set_createtime(batch_request_p->createtime() +
@@ -366,7 +364,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
response->set_seq(request->seq());
if (post_exec_func_) {
post_exec_func_(std::move(request), std::move(response));
- global_stats_->RecordResponseSent(seq);
}
global_stats_->IncExecuteDone();
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index f8ab3d9d..5a970dd6 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -266,7 +266,6 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
global_stats_->IncPrepare();
uint64_t seq = request->seq();
int sender_id = request->sender_id();
- global_stats_->RecordPrepareRecv(seq, sender_id);
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
commit_request->mutable_data_signature()->Clear();
@@ -311,7 +310,6 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
std::move(request));
}
global_stats_->IncCommit();
- global_stats_->RecordCommitRecv(seq, sender_id);
// Add request to message_manager.
// If it has received enough same requests(2f+1), message manager will
// commit the request.
diff --git a/platform/networkstrate/service_network.cpp
b/platform/networkstrate/service_network.cpp
index 0bc60579..3a05afda 100644
--- a/platform/networkstrate/service_network.cpp
+++ b/platform/networkstrate/service_network.cpp
@@ -74,17 +74,6 @@ void ServiceNetwork::AcceptorHandler(const char* buffer,
size_t data_len) {
item->data = std::move(sub_request_info);
// LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data
// len:"<<item->data->data_len;
-
- // Try to extract seq from request for timeline tracking
- try {
- Request request;
- if (request.ParseFromArray(sub_data.data(), sub_data.size())) {
- global_stats_->RecordNetworkRecv(request.seq());
- }
- } catch (...) {
- // Ignore parse errors, seq extraction is optional
- }
-
global_stats_->ServerCall();
input_queue_.Push(std::move(item));
}
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 042668e1..81cafe7f 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -26,10 +26,6 @@ cc_library(
name = "stats",
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
- copts = select({
- "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"],
- "//conditions:default": [],
- }),
deps = [
":prometheus_handler",
"//common:asio",
@@ -42,7 +38,6 @@ cc_library(
"//proto/kv:kv_cc_proto",
"//third_party:crow",
"//third_party:prometheus",
- "//third_party:leveldb",
],
)
@@ -59,4 +54,4 @@ cc_library(
cc_binary(
name = "set_random_data",
srcs = ["set_random_data.cpp"],
-)
+)
\ No newline at end of file
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 5b4e3aec..1c511b3b 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -17,948 +17,589 @@
* under the License.
*/
-#include "platform/statistic/stats.h"
-
-#include <glog/logging.h>
-
-#include <ctime>
-#include <sstream>
-#include <fstream>
-#include <sys/stat.h>
-#include <errno.h>
-
-#include "common/utils/utils.h"
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/write_batch.h"
-#include "proto/kv/kv.pb.h"
-
-namespace asio = boost::asio;
-namespace beast = boost::beast;
-using tcp = asio::ip::tcp;
-
-namespace resdb {
-
-std::mutex g_mutex;
-Stats* Stats::GetGlobalStats(int seconds) {
- std::unique_lock<std::mutex> lk(g_mutex);
- static Stats stats(seconds);
- return &stats;
-} // gets a singelton instance of Stats Class
-
-Stats::Stats(int sleep_time) {
- monitor_sleep_time_ = sleep_time;
-#ifdef TEST_MODE
- monitor_sleep_time_ = 1;
-#endif
- num_call_ = 0;
- num_commit_ = 0;
- run_time_ = 0;
- run_call_ = 0;
- run_call_time_ = 0;
- server_call_ = 0;
- server_process_ = 0;
- run_req_num_ = 0;
- run_req_run_time_ = 0;
- seq_gap_ = 0;
- total_request_ = 0;
- total_geo_request_ = 0;
- geo_request_ = 0;
-
- stop_ = false;
- begin_ = false;
-
- socket_recv_ = 0;
- broad_cast_msg_ = 0;
- send_broad_cast_msg_ = 0;
-
- prometheus_ = nullptr;
- global_thread_ =
- std::thread(&Stats::MonitorGlobal, this); // pass by reference
-
- transaction_summary_.port = -1;
-
- // Setup websocket here
- make_faulty_.store(false);
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.txn_number = 0;
-}
-
-void Stats::Stop() { stop_ = true; }
-
-Stats::~Stats() {
- stop_ = true;
- if (global_thread_.joinable()) {
- global_thread_.join();
- }
- if (enable_resview && crow_thread_.joinable()) {
- crow_thread_.join();
- }
-}
-
-int64_t GetRSS() {
- int64_t rss = 0;
- FILE* fp = NULL;
- if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
- return 0;
- }
-
- uint64_t size, resident, share, text, lib, data, dt;
- if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share,
&text,
- &lib, &data, &dt) != 7) {
- fclose(fp);
- return 0;
- }
- fclose(fp);
-
- int64_t page_size = sysconf(_SC_PAGESIZE);
- rss = resident * page_size;
-
- // Convert to MB
- rss = rss / (1024 * 1024);
-
- return rss;
-}
-
-void Stats::CrowRoute() {
- crow::SimpleApp app;
- while (!stop_) {
- try {
- //Deprecated
- CROW_ROUTE(app, "/consensus_data")
- .methods("GET"_method)(
- [this](const crow::request& req, crow::response& res) {
- LOG(ERROR) << "API 1";
- res.set_header("Access-Control-Allow-Origin", "*");
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS");
- res.set_header("Access-Control-Allow-Headers",
- "Content-Type, Authorization");
-
- int limit = 10;
- int page = 1;
-
- std::string url = req.url;
- size_t query_pos = url.find('?');
- if (query_pos != std::string::npos) {
- std::string query_string = url.substr(query_pos + 1);
- std::istringstream iss(query_string);
- std::string param;
- while (std::getline(iss, param, '&')) {
- size_t eq_pos = param.find('=');
- if (eq_pos != std::string::npos) {
- std::string key = param.substr(0, eq_pos);
- std::string value = param.substr(eq_pos + 1);
- if (key == "limit") {
- try {
- limit = std::stoi(value);
- if (limit <= 0) limit = 10;
- } catch (...) {
- limit = 10;
- }
- } else if (key == "page") {
- try {
- page = std::stoi(value);
- if (page <= 0) page = 1;
- } catch (...) {
- page = 1;
- }
- }
- }
- }
- }
-
- nlohmann::json result;
- int total_count = 0;
- int offset = (page - 1) * limit;
- int current_index = 0;
- int items_collected = 0;
-
- if (summary_db_) {
- std::lock_guard<std::mutex> lock(summary_db_mutex_);
- leveldb::Iterator* it =
- summary_db_->NewIterator(leveldb::ReadOptions());
-
- // Count only non-timeline entries
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::string key = it->key().ToString();
- if (key.find("timeline_") != 0) {
- total_count++;
- }
- }
-
- // Iterate and collect only non-timeline entries
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::string key = it->key().ToString();
- // Skip timeline entries
- if (key.find("timeline_") == 0) {
- continue;
- }
-
- if (current_index >= offset && items_collected < limit) {
- try {
- result[key] =
- nlohmann::json::parse(it->value().ToString());
- items_collected++;
- } catch (...) {
- res.code = 500;
- result["error"] = "Failed to parse transaction data";
- delete it;
- res.body = result.dump();
- res.end();
- return;
- }
- }
- current_index++;
- }
- delete it;
- }
-
- res.body = result.dump();
- res.end();
- });
- CROW_ROUTE(app, "/get_status")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 2";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- // Send your response
- res.body = IsFaulty() ? "Faulty" : "Not Faulty";
- res.end();
- });
- CROW_ROUTE(app, "/make_faulty")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 3";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- res.body = "Not Enabled";
- // Send your response
- if (enable_faulty_switch_) {
- make_faulty_.store(!make_faulty_.load());
- res.body = "Success";
- }
- res.end();
- });
- CROW_ROUTE(app, "/transaction_data")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res) {
- LOG(ERROR) << "API 4";
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- nlohmann::json mem_view_json;
- int status =
- getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
- if (status == 0) {
- mem_view_json["resident_set_size"] = GetRSS();
- mem_view_json["max_resident_set_size"] =
- transaction_summary_.process_stats_.ru_maxrss;
- mem_view_json["num_reads"] =
- transaction_summary_.process_stats_.ru_inblock;
- mem_view_json["num_writes"] =
- transaction_summary_.process_stats_.ru_oublock;
- }
-
- mem_view_json["ext_cache_hit_ratio"] =
- transaction_summary_.ext_cache_hit_ratio_;
- mem_view_json["level_db_stats"] =
- transaction_summary_.level_db_stats_;
- mem_view_json["level_db_approx_mem_size"] =
- transaction_summary_.level_db_approx_mem_size_;
- res.body = mem_view_json.dump();
- mem_view_json.clear();
- res.end();
- });
- CROW_ROUTE(app, "/consensus_data/<int>")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res, int txn_number) {
- LOG(ERROR) << "API 5: Get transaction " << txn_number;
- res.set_header("Access-Control-Allow-Origin",
- "*"); // Allow requests from any origin
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS"); // Specify allowed methods
- res.set_header(
- "Access-Control-Allow-Headers",
- "Content-Type, Authorization"); // Specify allowed headers
-
- nlohmann::json result;
- if (summary_db_) {
- std::lock_guard<std::mutex> lock(summary_db_mutex_);
- std::string txn_key = std::to_string(txn_number);
- std::string value;
- leveldb::Status status =
- summary_db_->Get(leveldb::ReadOptions(), txn_key, &value);
-
- if (status.ok() && !value.empty()) {
- try {
- result[txn_key] = nlohmann::json::parse(value);
- } catch (...) {
- res.code = 500;
- result["error"] = "Failed to parse transaction data";
- }
- } else {
- res.code = 404;
- result["error"] = "Transaction not found";
- result["txn_number"] = txn_number;
- }
- } else {
- res.code = 503;
- result["error"] = "Storage not initialized";
- }
-
- res.body = result.dump();
- res.end();
- });
-
- CROW_ROUTE(app, "/transaction_timeline/<int>")
- .methods("GET"_method)([this](const crow::request& req,
- crow::response& res, int txn_number) {
- LOG(ERROR) << "API 6: Get transaction timeline " << txn_number;
- res.set_header("Access-Control-Allow-Origin", "*");
- res.set_header("Access-Control-Allow-Methods",
- "GET, POST, OPTIONS");
- res.set_header("Access-Control-Allow-Headers",
- "Content-Type, Authorization");
-
- nlohmann::json result;
- result["txn_number"] = txn_number;
-
- if (!summary_db_) {
- res.code = 503;
- result["error"] = "Storage not initialized";
- res.body = result.dump();
- res.end();
- return;
- }
-
- std::lock_guard<std::mutex> lock(summary_db_mutex_);
- std::string txn_key = std::to_string(txn_number);
- std::string timeline_key = "timeline_" + txn_key;
-
- std::string txn_value;
- leveldb::Status status =
- summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value);
-
- if (status.ok() && !txn_value.empty()) {
- try {
- nlohmann::json txn_summary = nlohmann::json::parse(txn_value);
- nlohmann::json txn_details;
- if (txn_summary.contains("txn_commands")) {
- txn_details["txn_commands"] = txn_summary["txn_commands"];
- }
- if (txn_summary.contains("txn_keys")) {
- txn_details["txn_keys"] = txn_summary["txn_keys"];
- }
- if (txn_summary.contains("txn_values")) {
- txn_details["txn_values"] = txn_summary["txn_values"];
- }
- if (txn_summary.contains("propose_pre_prepare_time")) {
- txn_details["propose_pre_prepare_time"] =
- txn_summary["propose_pre_prepare_time"];
- }
- if (txn_summary.contains("prepare_time")) {
- txn_details["prepare_time"] = txn_summary["prepare_time"];
- }
- if (txn_summary.contains("commit_time")) {
- txn_details["commit_time"] = txn_summary["commit_time"];
- }
- if (txn_summary.contains("execution_time")) {
- txn_details["execution_time"] =
txn_summary["execution_time"];
- }
- result["transaction_details"] = txn_details;
- } catch (...) {
- LOG(ERROR) << "Failed to parse transaction summary for txn: "
- << txn_number;
- }
- }
-
- std::string timeline_value;
- status = summary_db_->Get(leveldb::ReadOptions(), timeline_key,
&timeline_value);
-
- if (status.ok() && !timeline_value.empty()) {
- try {
- nlohmann::json events_array =
nlohmann::json::parse(timeline_value);
- result["timeline"] = events_array;
- } catch (...) {
- LOG(ERROR) << "Failed to parse timeline for txn: " <<
txn_number;
- }
- } else {
- LOG(ERROR) << "Timeline not found for txn: " << txn_number;
- }
-
- res.body = result.dump();
- res.end();
- });
- app.port(8500 + transaction_summary_.port).multithreaded().run();
- sleep(1);
- } catch (const std::exception& e) {
- }
- }
- app.stop();
-}
-
-bool Stats::IsFaulty() { return make_faulty_.load(); }
-
-void Stats::ChangePrimary(int primary_id) {
- transaction_summary_.primary_id = primary_id;
- make_faulty_.store(false);
-}
-
-void Stats::SetProps(int replica_id, std::string ip, int port,
- bool resview_flag, bool faulty_flag) {
- transaction_summary_.replica_id = replica_id;
- transaction_summary_.ip = ip;
- transaction_summary_.port = port;
- enable_resview = resview_flag;
- enable_faulty_switch_ = faulty_flag;
- if (resview_flag) {
- // Single data directory for both summaries and timeline
- std::string data_dir = "./resdb_data_" + std::to_string(port);
- int mkdir_result = mkdir(data_dir.c_str(), 0755);
- if (mkdir_result != 0 && errno != EEXIST) {
- LOG(ERROR) << "Failed to create data directory: " << data_dir;
- }
-
- // Initialize LevelDB for both transaction summaries and timeline events
- std::string summary_path = data_dir + "/summaries";
- leveldb::Options options;
- options.create_if_missing = true;
- leveldb::DB* db = nullptr;
- leveldb::Status status = leveldb::DB::Open(options, summary_path, &db);
- if (status.ok()) {
- summary_db_.reset(db);
- LOG(INFO) << "Initialized LevelDB storage at: " << summary_path;
- } else {
- LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": "
- << status.ToString();
- }
-
- crow_thread_ = std::thread(&Stats::CrowRoute, this);
- }
-}
-
-void Stats::SetPrimaryId(int primary_id) {
- transaction_summary_.primary_id = primary_id;
-}
-
-void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
- std::string level_db_stats,
- std::string level_db_approx_mem_size) {
- transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
- transaction_summary_.level_db_stats_ = level_db_stats;
- transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
-}
-
-void Stats::RecordStateTime(std::string state) {
- if (!enable_resview) {
- return;
- }
- if (state == "request" || state == "pre-prepare") {
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::now();
- } else if (state == "prepare") {
- transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
- } else if (state == "commit") {
- transaction_summary_.commit_state_time = std::chrono::system_clock::now();
- }
-}
-
-void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
- if (!enable_resview) {
- return;
- }
- transaction_summary_.txn_number = batch_request.seq();
- transaction_summary_.txn_command.clear();
- transaction_summary_.txn_key.clear();
- transaction_summary_.txn_value.clear();
- for (auto& sub_request : batch_request.user_requests()) {
- KVRequest kv_request;
- if (!kv_request.ParseFromString(sub_request.request().data())) {
- break;
- }
- if (kv_request.cmd() == KVRequest::SET) {
- transaction_summary_.txn_command.push_back("SET");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back(kv_request.value());
- } else if (kv_request.cmd() == KVRequest::GET) {
- transaction_summary_.txn_command.push_back("GET");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back("");
- } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
- transaction_summary_.txn_command.push_back("GETALLVALUES");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back("");
- } else if (kv_request.cmd() == KVRequest::GETRANGE) {
- transaction_summary_.txn_command.push_back("GETRANGE");
- transaction_summary_.txn_key.push_back(kv_request.key());
- transaction_summary_.txn_value.push_back(kv_request.value());
- }
- }
-}
-
-void Stats::SendSummary() {
- if (!enable_resview) {
- return;
- }
- transaction_summary_.execution_time = std::chrono::system_clock::now();
-
- // Convert Transaction Summary to JSON
- summary_json_["replica_id"] = transaction_summary_.replica_id;
- summary_json_["ip"] = transaction_summary_.ip;
- summary_json_["port"] = transaction_summary_.port;
- summary_json_["primary_id"] = transaction_summary_.primary_id;
- summary_json_["propose_pre_prepare_time"] =
- transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
- .count();
- summary_json_["prepare_time"] =
- transaction_summary_.prepare_state_time.time_since_epoch().count();
- summary_json_["commit_time"] =
- transaction_summary_.commit_state_time.time_since_epoch().count();
- summary_json_["execution_time"] =
- transaction_summary_.execution_time.time_since_epoch().count();
- for (size_t i = 0;
- i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
- summary_json_["prepare_message_timestamps"].push_back(
- transaction_summary_.prepare_message_count_times_list[i]
- .time_since_epoch()
- .count());
- }
- for (size_t i = 0;
- i < transaction_summary_.commit_message_count_times_list.size(); i++) {
- summary_json_["commit_message_timestamps"].push_back(
- transaction_summary_.commit_message_count_times_list[i]
- .time_since_epoch()
- .count());
- }
- summary_json_["txn_number"] = transaction_summary_.txn_number;
- for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
- summary_json_["txn_commands"].push_back(
- transaction_summary_.txn_command[i]);
- }
- for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
- summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
- }
- for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
- summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
- }
-
- summary_json_["ext_cache_hit_ratio"] =
- transaction_summary_.ext_cache_hit_ratio_;
-
- std::string txn_key = std::to_string(transaction_summary_.txn_number);
-
- if (summary_db_) {
- std::lock_guard<std::mutex> lock(summary_db_mutex_);
-
- // Write transaction summary
- leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key,
- summary_json_.dump());
- if (!status.ok()) {
- LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key
- << ": " << status.ToString();
- }
-
- // Batch write all buffered timeline events for this transaction
- {
- std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
- auto it = timeline_buffer_.find(transaction_summary_.txn_number);
- if (it != timeline_buffer_.end() && !it->second.empty()) {
- std::string timeline_key = "timeline_" + txn_key;
-
- // Read existing timeline events if they exist, then merge
- nlohmann::json events_array;
- std::string existing_value;
- leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(),
timeline_key, &existing_value);
- if (read_status.ok() && !existing_value.empty()) {
- try {
- events_array = nlohmann::json::parse(existing_value);
- // Ensure it's an array
- if (!events_array.is_array()) {
- events_array = nlohmann::json::array();
- }
- } catch (...) {
- // If parsing fails, start with empty array
- events_array = nlohmann::json::array();
- }
- } else {
- events_array = nlohmann::json::array();
- }
-
- // Append new events to existing array
- for (const auto& event : it->second) {
- events_array.push_back(event);
- }
-
- // Write merged timeline events back to LevelDB
- status = summary_db_->Put(leveldb::WriteOptions(), timeline_key,
- events_array.dump());
- if (!status.ok()) {
- LOG(ERROR) << "Failed to write timeline to storage for txn: " <<
txn_key
- << ": " << status.ToString();
- }
- // Clean up buffer for this txn
- timeline_buffer_.erase(it);
- }
- }
- }
-
- LOG(ERROR) << summary_json_.dump();
-
- // Reset Transaction Summary Parameters
- transaction_summary_.request_pre_prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.commit_state_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.execution_time =
- std::chrono::system_clock::time_point::min();
- transaction_summary_.prepare_message_count_times_list.clear();
- transaction_summary_.commit_message_count_times_list.clear();
-
- summary_json_.clear();
-}
-
-void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int
sender_id) {
- if (!enable_resview) {
- return;
- }
-
- // Buffer timeline events in memory - written to LevelDB in batch during
SendSummary()
- std::lock_guard<std::mutex> lock(timeline_buffer_mutex_);
-
- // Create new event
- nlohmann::json event;
- event["timestamp"] =
std::chrono::system_clock::now().time_since_epoch().count();
- event["phase"] = phase;
- if (sender_id >= 0) {
- event["sender_id"] = sender_id;
- }
-
- // Append to in-memory buffer for this seq
- timeline_buffer_[seq].push_back(event);
-}
-
-void Stats::RecordNetworkRecv(uint64_t seq) {
- WriteTimelineEvent(seq, "network_recv");
-}
-
-void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) {
- WriteTimelineEvent(seq, "prepare_recv", sender_id);
-}
-
-void Stats::RecordCommitRecv(uint64_t seq, int sender_id) {
- WriteTimelineEvent(seq, "commit_recv", sender_id);
-}
-
-void Stats::RecordExecuteStart(uint64_t seq) {
- WriteTimelineEvent(seq, "execute_start");
-}
-
-void Stats::RecordExecuteEnd(uint64_t seq) {
- WriteTimelineEvent(seq, "execute_end");
-}
-
-void Stats::RecordResponseSent(uint64_t seq) {
- WriteTimelineEvent(seq, "response_sent");
-}
-
-void Stats::CleanupOldTimelineEntries() {
- // Periodically clean up old buffer entries to prevent memory leaks
- // Keep only entries within the last 10000 sequence numbers
- std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_);
-
- if (timeline_buffer_.empty()) {
- return;
- }
-
- // Get the highest sequence number we've seen
- uint64_t max_seq = timeline_buffer_.rbegin()->first;
- uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0;
-
- // Remove all entries below the threshold
- auto it = timeline_buffer_.begin();
- while (it != timeline_buffer_.end() && it->first < cleanup_threshold) {
- it = timeline_buffer_.erase(it);
- }
-
- size_t buffer_size = timeline_buffer_.size();
- if (buffer_size > 1000) {
- LOG(WARNING) << "Timeline buffer size is large: " << buffer_size
- << " entries. This may indicate transactions not completing.";
- }
-}
-
-void Stats::MonitorGlobal() {
- LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
-
- uint64_t seq_fail = 0;
- uint64_t client_call = 0, socket_recv = 0;
- uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit =
0,
- pending_execute = 0, execute = 0, execute_done = 0;
- uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
- uint64_t send_broad_cast_msg_per_rep = 0;
- uint64_t server_call = 0, server_process = 0;
- uint64_t seq_gap = 0;
- uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
-
- // ====== for client proxy ======
- uint64_t run_req_num = 0, run_req_run_time = 0;
-
- uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
- // =============================
-
- uint64_t last_seq_fail = 0;
- uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
- last_num_commit = 0;
- uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
- uint64_t last_client_call = 0, last_socket_recv = 0;
- uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
- uint64_t last_send_broad_cast_msg_per_rep = 0;
- uint64_t last_server_call = 0, last_server_process = 0;
- uint64_t last_total_request = 0, last_total_geo_request = 0,
- last_geo_request = 0;
- uint64_t time = 0;
-
- while (!stop_) {
- sleep(monitor_sleep_time_);
- time += monitor_sleep_time_;
-
- // Periodically clean up old timeline buffer entries to prevent memory
leaks
- CleanupOldTimelineEntries();
-
- seq_fail = seq_fail_;
- socket_recv = socket_recv_;
- client_call = client_call_;
- num_client_req = num_client_req_;
- num_propose = num_propose_;
- num_prepare = num_prepare_;
- num_commit = num_commit_;
- pending_execute = pending_execute_;
- execute = execute_;
- execute_done = execute_done_;
- broad_cast_msg = broad_cast_msg_;
- send_broad_cast_msg = send_broad_cast_msg_;
- send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
- server_call = server_call_;
- server_process = server_process_;
- seq_gap = seq_gap_;
- total_request = total_request_;
- total_geo_request = total_geo_request_;
- geo_request = geo_request_;
-
- run_req_num = run_req_num_;
- run_req_run_time = run_req_run_time_;
-
- LOG(ERROR) << "=========== monitor =========\n"
- << "server call:" << server_call - last_server_call
- << " server process:" << server_process - last_server_process
- << " socket recv:" << socket_recv - last_socket_recv
- << " "
- "client call:"
- << client_call - last_client_call
- << " "
- "client req:"
- << num_client_req - last_num_client_req
- << " "
- "broad_cast:"
- << broad_cast_msg - last_broad_cast_msg
- << " "
- "send broad_cast:"
- << send_broad_cast_msg - last_send_broad_cast_msg
- << " "
- "per send broad_cast:"
- << send_broad_cast_msg_per_rep -
last_send_broad_cast_msg_per_rep
- << " "
- "propose:"
- << num_propose - last_num_propose
- << " "
- "prepare:"
- << (num_prepare - last_num_prepare)
- << " "
- "commit:"
- << (num_commit - last_num_commit)
- << " "
- "pending execute:"
- << pending_execute - last_pending_execute
- << " "
- "execute:"
- << execute - last_execute
- << " "
- "execute done:"
- << execute_done - last_execute_done << " seq gap:" << seq_gap
- << " total request:" << total_request - last_total_request
- << " txn:" << (total_request - last_total_request) / 5
- << " total geo request:"
- << total_geo_request - last_total_geo_request
- << " total geo request per:"
- << (total_geo_request - last_total_geo_request) / 5
- << " geo request:" << (geo_request - last_geo_request)
- << " "
- "seq fail:"
- << seq_fail - last_seq_fail << " time:" << time
- << " "
- "\n--------------- monitor ------------";
- if (run_req_num - last_run_req_num > 0) {
- LOG(ERROR) << " req client latency:"
- << static_cast<double>(run_req_run_time -
- last_run_req_run_time) /
- (run_req_num - last_run_req_num) / 1000000000.0;
- }
-
- last_seq_fail = seq_fail;
- last_socket_recv = socket_recv;
- last_client_call = client_call;
- last_num_client_req = num_client_req;
- last_num_propose = num_propose;
- last_num_prepare = num_prepare;
- last_num_commit = num_commit;
- last_pending_execute = pending_execute;
- last_execute = execute;
- last_execute_done = execute_done;
-
- last_broad_cast_msg = broad_cast_msg;
- last_send_broad_cast_msg = send_broad_cast_msg;
- last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
-
- last_server_call = server_call;
- last_server_process = server_process;
-
- last_run_req_num = run_req_num;
- last_run_req_run_time = run_req_run_time;
- last_total_request = total_request;
- last_total_geo_request = total_geo_request;
- last_geo_request = geo_request;
- }
-}
-
-void Stats::IncClientCall() {
- if (prometheus_) {
- prometheus_->Inc(CLIENT_CALL, 1);
- }
- client_call_++;
-}
-
-void Stats::IncClientRequest() {
- if (prometheus_) {
- prometheus_->Inc(CLIENT_REQ, 1);
- }
- num_client_req_++;
-}
-
-void Stats::IncPropose() {
- if (prometheus_) {
- prometheus_->Inc(PROPOSE, 1);
- }
- num_propose_++;
-}
-
-void Stats::IncPrepare() {
- if (prometheus_) {
- prometheus_->Inc(PREPARE, 1);
- }
- num_prepare_++;
- if (enable_resview) {
- transaction_summary_.prepare_message_count_times_list.push_back(
- std::chrono::system_clock::now());
- }
-}
-
-void Stats::IncCommit() {
- if (prometheus_) {
- prometheus_->Inc(COMMIT, 1);
- }
- num_commit_++;
- if (enable_resview) {
- transaction_summary_.commit_message_count_times_list.push_back(
- std::chrono::system_clock::now());
- }
-}
-
-void Stats::IncPendingExecute() { pending_execute_++; }
-
-void Stats::IncExecute() { execute_++; }
-
-void Stats::IncExecuteDone() {
- if (prometheus_) {
- prometheus_->Inc(EXECUTE, 1);
- }
- execute_done_++;
-}
-
-void Stats::BroadCastMsg() {
- if (prometheus_) {
- prometheus_->Inc(BROAD_CAST, 1);
- }
- broad_cast_msg_++;
-}
-
-void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
-
-void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
-
-void Stats::SeqFail() { seq_fail_++; }
-
-void Stats::IncTotalRequest(uint32_t num) {
- if (prometheus_) {
- prometheus_->Inc(NUM_EXECUTE_TX, num);
- }
- total_request_ += num;
-}
-
-void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
-
-void Stats::IncGeoRequest() { geo_request_++; }
-
-void Stats::ServerCall() {
- if (prometheus_) {
- prometheus_->Inc(SERVER_CALL_NAME, 1);
- }
- server_call_++;
-}
-
-void Stats::ServerProcess() {
- if (prometheus_) {
- prometheus_->Inc(SERVER_PROCESS, 1);
- }
- server_process_++;
-}
-
-void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
-
-void Stats::AddLatency(uint64_t run_time) {
- run_req_num_++;
- run_req_run_time_ += run_time;
-}
-
-void Stats::SetPrometheus(const std::string& prometheus_address) {
- prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
-}
-
-} // namespace resdb
+ #include "platform/statistic/stats.h"
+
+ #include <glog/logging.h>
+
+ #include <ctime>
+
+ #include "common/utils/utils.h"
+ #include "proto/kv/kv.pb.h"
+
+ namespace asio = boost::asio;
+ namespace beast = boost::beast;
+ using tcp = asio::ip::tcp;
+
+ namespace resdb {
+
+ std::mutex g_mutex;
+ Stats* Stats::GetGlobalStats(int seconds) {
+ std::unique_lock<std::mutex> lk(g_mutex);
+ static Stats stats(seconds);
+ return &stats;
+ } // gets a singelton instance of Stats Class
+
+ Stats::Stats(int sleep_time) {
+ monitor_sleep_time_ = sleep_time;
+ #ifdef TEST_MODE
+ monitor_sleep_time_ = 1;
+ #endif
+ num_call_ = 0;
+ num_commit_ = 0;
+ run_time_ = 0;
+ run_call_ = 0;
+ run_call_time_ = 0;
+ server_call_ = 0;
+ server_process_ = 0;
+ run_req_num_ = 0;
+ run_req_run_time_ = 0;
+ seq_gap_ = 0;
+ total_request_ = 0;
+ total_geo_request_ = 0;
+ geo_request_ = 0;
+
+ stop_ = false;
+ begin_ = false;
+
+ socket_recv_ = 0;
+ broad_cast_msg_ = 0;
+ send_broad_cast_msg_ = 0;
+
+ prometheus_ = nullptr;
+ global_thread_ =
+ std::thread(&Stats::MonitorGlobal, this); // pass by reference
+
+ transaction_summary_.port = -1;
+
+ // Setup websocket here
+ make_faulty_.store(false);
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.txn_number = 0;
+ }
+
+ void Stats::Stop() { stop_ = true; }
+
+ Stats::~Stats() {
+ stop_ = true;
+ if (global_thread_.joinable()) {
+ global_thread_.join();
+ }
+ if (enable_resview && crow_thread_.joinable()) {
+ crow_thread_.join();
+ }
+ }
+
+ int64_t GetRSS() {
+ int64_t rss = 0;
+ FILE* fp = NULL;
+ if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
+ return 0;
+ }
+
+ uint64_t size, resident, share, text, lib, data, dt;
+ if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share,
&text,
+ &lib, &data, &dt) != 7) {
+ fclose(fp);
+ return 0;
+ }
+ fclose(fp);
+
+ int64_t page_size = sysconf(_SC_PAGESIZE);
+ rss = resident * page_size;
+
+ // Convert to MB
+ rss = rss / (1024 * 1024);
+
+ return rss;
+ }
+
+ void Stats::CrowRoute() {
+ crow::SimpleApp app;
+ while (!stop_) {
+ try {
+ CROW_ROUTE(app, "/consensus_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body = consensus_history_.dump();
+ res.end();
+ });
+ CROW_ROUTE(app, "/get_status")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 2";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+ res.end();
+ });
+ CROW_ROUTE(app, "/make_faulty")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 3";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ if (enable_faulty_switch_) {
+ make_faulty_.store(!make_faulty_.load());
+ }
+ res.body = "Success";
+ res.end();
+ });
+ CROW_ROUTE(app, "/transaction_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 4";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ nlohmann::json mem_view_json;
+ int status =
+ getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
+ if (status == 0) {
+ mem_view_json["resident_set_size"] = GetRSS();
+ mem_view_json["max_resident_set_size"] =
+ transaction_summary_.process_stats_.ru_maxrss;
+ mem_view_json["num_reads"] =
+ transaction_summary_.process_stats_.ru_inblock;
+ mem_view_json["num_writes"] =
+ transaction_summary_.process_stats_.ru_oublock;
+ }
+
+ mem_view_json["ext_cache_hit_ratio"] =
+ transaction_summary_.ext_cache_hit_ratio_;
+ mem_view_json["level_db_stats"] =
+ transaction_summary_.level_db_stats_;
+ mem_view_json["level_db_approx_mem_size"] =
+ transaction_summary_.level_db_approx_mem_size_;
+ res.body = mem_view_json.dump();
+ mem_view_json.clear();
+ res.end();
+ });
+ app.port(8500 + transaction_summary_.port).multithreaded().run();
+ sleep(1);
+ } catch (const std::exception& e) {
+ }
+ }
+ app.stop();
+ }
+
+ bool Stats::IsFaulty() { return make_faulty_.load(); }
+
+ void Stats::ChangePrimary(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
+ make_faulty_.store(false);
+ }
+
+ void Stats::SetProps(int replica_id, std::string ip, int port,
+ bool resview_flag, bool faulty_flag) {
+ transaction_summary_.replica_id = replica_id;
+ transaction_summary_.ip = ip;
+ transaction_summary_.port = port;
+ enable_resview = resview_flag;
+ enable_faulty_switch_ = faulty_flag;
+ if (resview_flag) {
+ crow_thread_ = std::thread(&Stats::CrowRoute, this);
+ }
+ }
+
+ void Stats::SetPrimaryId(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
+ }
+
+ void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
+ std::string level_db_stats,
+ std::string level_db_approx_mem_size) {
+ transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
+ transaction_summary_.level_db_stats_ = level_db_stats;
+ transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+ }
+
+ void Stats::RecordStateTime(std::string state) {
+ if (!enable_resview) {
+ return;
+ }
+ if (state == "request" || state == "pre-prepare") {
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::now();
+ } else if (state == "prepare") {
+ transaction_summary_.prepare_state_time =
std::chrono::system_clock::now();
+ } else if (state == "commit") {
+ transaction_summary_.commit_state_time = std::chrono::system_clock::now();
+ }
+ }
+
+ void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+ if (!enable_resview) {
+ return;
+ }
+ transaction_summary_.txn_number = batch_request.seq();
+ transaction_summary_.txn_command.clear();
+ transaction_summary_.txn_key.clear();
+ transaction_summary_.txn_value.clear();
+ for (auto& sub_request : batch_request.user_requests()) {
+ KVRequest kv_request;
+ if (!kv_request.ParseFromString(sub_request.request().data())) {
+ break;
+ }
+ if (kv_request.cmd() == KVRequest::SET) {
+ transaction_summary_.txn_command.push_back("SET");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back(kv_request.value());
+ } else if (kv_request.cmd() == KVRequest::GET) {
+ transaction_summary_.txn_command.push_back("GET");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+ transaction_summary_.txn_command.push_back("GETALLVALUES");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+ transaction_summary_.txn_command.push_back("GETRANGE");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back(kv_request.value());
+ }
+ }
+ }
+
+ void Stats::SendSummary() {
+ if (!enable_resview) {
+ return;
+ }
+ transaction_summary_.execution_time = std::chrono::system_clock::now();
+
+ // Convert Transaction Summary to JSON
+ summary_json_["replica_id"] = transaction_summary_.replica_id;
+ summary_json_["ip"] = transaction_summary_.ip;
+ summary_json_["port"] = transaction_summary_.port;
+ summary_json_["primary_id"] = transaction_summary_.primary_id;
+ summary_json_["propose_pre_prepare_time"] =
+ transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+ .count();
+ summary_json_["prepare_time"] =
+ transaction_summary_.prepare_state_time.time_since_epoch().count();
+ summary_json_["commit_time"] =
+ transaction_summary_.commit_state_time.time_since_epoch().count();
+ summary_json_["execution_time"] =
+ transaction_summary_.execution_time.time_since_epoch().count();
+ for (size_t i = 0;
+ i < transaction_summary_.prepare_message_count_times_list.size(); i++)
{
+ summary_json_["prepare_message_timestamps"].push_back(
+ transaction_summary_.prepare_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
+ }
+ for (size_t i = 0;
+ i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+ summary_json_["commit_message_timestamps"].push_back(
+ transaction_summary_.commit_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
+ }
+ summary_json_["txn_number"] = transaction_summary_.txn_number;
+ for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+ summary_json_["txn_commands"].push_back(
+ transaction_summary_.txn_command[i]);
+ }
+ for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
+ summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+ }
+ for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
+ summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+ }
+
+ summary_json_["ext_cache_hit_ratio"] =
+ transaction_summary_.ext_cache_hit_ratio_;
+ consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+ summary_json_;
+
+ LOG(ERROR) << summary_json_.dump();
+
+ // Reset Transaction Summary Parameters
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_message_count_times_list.clear();
+ transaction_summary_.commit_message_count_times_list.clear();
+
+ summary_json_.clear();
+ }
+
+ void Stats::MonitorGlobal() {
+ LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
+
+ uint64_t seq_fail = 0;
+ uint64_t client_call = 0, socket_recv = 0;
+ uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit =
0,
+ pending_execute = 0, execute = 0, execute_done = 0;
+ uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
+ uint64_t send_broad_cast_msg_per_rep = 0;
+ uint64_t server_call = 0, server_process = 0;
+ uint64_t seq_gap = 0;
+ uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
+
+ // ====== for client proxy ======
+ uint64_t run_req_num = 0, run_req_run_time = 0;
+
+ uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
+ // =============================
+
+ uint64_t last_seq_fail = 0;
+ uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare =
0,
+ last_num_commit = 0;
+ uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
+ uint64_t last_client_call = 0, last_socket_recv = 0;
+ uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
+ uint64_t last_send_broad_cast_msg_per_rep = 0;
+ uint64_t last_server_call = 0, last_server_process = 0;
+ uint64_t last_total_request = 0, last_total_geo_request = 0,
+ last_geo_request = 0;
+ uint64_t time = 0;
+
+ while (!stop_) {
+ sleep(monitor_sleep_time_);
+ time += monitor_sleep_time_;
+ seq_fail = seq_fail_;
+ socket_recv = socket_recv_;
+ client_call = client_call_;
+ num_client_req = num_client_req_;
+ num_propose = num_propose_;
+ num_prepare = num_prepare_;
+ num_commit = num_commit_;
+ pending_execute = pending_execute_;
+ execute = execute_;
+ execute_done = execute_done_;
+ broad_cast_msg = broad_cast_msg_;
+ send_broad_cast_msg = send_broad_cast_msg_;
+ send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
+ server_call = server_call_;
+ server_process = server_process_;
+ seq_gap = seq_gap_;
+ total_request = total_request_;
+ total_geo_request = total_geo_request_;
+ geo_request = geo_request_;
+
+ run_req_num = run_req_num_;
+ run_req_run_time = run_req_run_time_;
+
+ LOG(ERROR) << "=========== monitor =========\n"
+ << "server call:" << server_call - last_server_call
+ << " server process:" << server_process - last_server_process
+ << " socket recv:" << socket_recv - last_socket_recv
+ << " "
+ "client call:"
+ << client_call - last_client_call
+ << " "
+ "client req:"
+ << num_client_req - last_num_client_req
+ << " "
+ "broad_cast:"
+ << broad_cast_msg - last_broad_cast_msg
+ << " "
+ "send broad_cast:"
+ << send_broad_cast_msg - last_send_broad_cast_msg
+ << " "
+ "per send broad_cast:"
+ << send_broad_cast_msg_per_rep -
last_send_broad_cast_msg_per_rep
+ << " "
+ "propose:"
+ << num_propose - last_num_propose
+ << " "
+ "prepare:"
+ << (num_prepare - last_num_prepare)
+ << " "
+ "commit:"
+ << (num_commit - last_num_commit)
+ << " "
+ "pending execute:"
+ << pending_execute - last_pending_execute
+ << " "
+ "execute:"
+ << execute - last_execute
+ << " "
+ "execute done:"
+ << execute_done - last_execute_done << " seq gap:" << seq_gap
+ << " total request:" << total_request - last_total_request
+ << " txn:" << (total_request - last_total_request) / 5
+ << " total geo request:"
+ << total_geo_request - last_total_geo_request
+ << " total geo request per:"
+ << (total_geo_request - last_total_geo_request) / 5
+ << " geo request:" << (geo_request - last_geo_request)
+ << " "
+ "seq fail:"
+ << seq_fail - last_seq_fail << " time:" << time
+ << " "
+ "\n--------------- monitor ------------";
+ if (run_req_num - last_run_req_num > 0) {
+ LOG(ERROR) << " req client latency:"
+ << static_cast<double>(run_req_run_time -
+ last_run_req_run_time) /
+ (run_req_num - last_run_req_num) / 1000000000.0;
+ }
+
+ last_seq_fail = seq_fail;
+ last_socket_recv = socket_recv;
+ last_client_call = client_call;
+ last_num_client_req = num_client_req;
+ last_num_propose = num_propose;
+ last_num_prepare = num_prepare;
+ last_num_commit = num_commit;
+ last_pending_execute = pending_execute;
+ last_execute = execute;
+ last_execute_done = execute_done;
+
+ last_broad_cast_msg = broad_cast_msg;
+ last_send_broad_cast_msg = send_broad_cast_msg;
+ last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
+
+ last_server_call = server_call;
+ last_server_process = server_process;
+
+ last_run_req_num = run_req_num;
+ last_run_req_run_time = run_req_run_time;
+ last_total_request = total_request;
+ last_total_geo_request = total_geo_request;
+ last_geo_request = geo_request;
+ }
+ }
+
+ void Stats::IncClientCall() {
+ if (prometheus_) {
+ prometheus_->Inc(CLIENT_CALL, 1);
+ }
+ client_call_++;
+ }
+
+ void Stats::IncClientRequest() {
+ if (prometheus_) {
+ prometheus_->Inc(CLIENT_REQ, 1);
+ }
+ num_client_req_++;
+ }
+
+ void Stats::IncPropose() {
+ if (prometheus_) {
+ prometheus_->Inc(PROPOSE, 1);
+ }
+ num_propose_++;
+ }
+
+ void Stats::IncPrepare() {
+ if (prometheus_) {
+ prometheus_->Inc(PREPARE, 1);
+ }
+ num_prepare_++;
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+
+ void Stats::IncCommit() {
+ if (prometheus_) {
+ prometheus_->Inc(COMMIT, 1);
+ }
+ num_commit_++;
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
+ }
+
+ void Stats::IncPendingExecute() { pending_execute_++; }
+
+ void Stats::IncExecute() { execute_++; }
+
+ void Stats::IncExecuteDone() {
+ if (prometheus_) {
+ prometheus_->Inc(EXECUTE, 1);
+ }
+ execute_done_++;
+ }
+
+ void Stats::BroadCastMsg() {
+ if (prometheus_) {
+ prometheus_->Inc(BROAD_CAST, 1);
+ }
+ broad_cast_msg_++;
+ }
+
+ void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+
+ void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+
+ void Stats::SeqFail() { seq_fail_++; }
+
+ void Stats::IncTotalRequest(uint32_t num) {
+ if (prometheus_) {
+ prometheus_->Inc(NUM_EXECUTE_TX, num);
+ }
+ total_request_ += num;
+ }
+
+ void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+
+ void Stats::IncGeoRequest() { geo_request_++; }
+
+ void Stats::ServerCall() {
+ if (prometheus_) {
+ prometheus_->Inc(SERVER_CALL_NAME, 1);
+ }
+ server_call_++;
+ }
+
+ void Stats::ServerProcess() {
+ if (prometheus_) {
+ prometheus_->Inc(SERVER_PROCESS, 1);
+ }
+ server_process_++;
+ }
+
+ void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+
+ void Stats::AddLatency(uint64_t run_time) {
+ run_req_num_++;
+ run_req_run_time_ += run_time;
+ }
+
+ void Stats::SetPrometheus(const std::string& prometheus_address) {
+ prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
+ }
+
+ } // namespace resdb
\ No newline at end of file
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index a2524fc1..c0ad30a7 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -17,177 +17,151 @@
* under the License.
*/
-#pragma once
-
-#include <crow.h>
-
-#include <chrono>
-#include <future>
-#include <map>
-#include <nlohmann/json.hpp>
-
-#include "boost/asio.hpp"
-#include "boost/beast.hpp"
-#include "platform/common/network/tcp_socket.h"
-#include "platform/proto/resdb.pb.h"
-#include "platform/statistic/prometheus_handler.h"
-#include "proto/kv/kv.pb.h"
-#include "sys/resource.h"
-
-// Forward declaration for LevelDB
-namespace leveldb {
-class DB;
-}
-
-namespace asio = boost::asio;
-namespace beast = boost::beast;
-using tcp = asio::ip::tcp;
-
-namespace resdb {
-
-struct VisualData {
- // Set when initializing
- int replica_id;
- int primary_id;
- std::string ip;
- int port;
-
- // Set when new txn is received
- int txn_number;
- std::vector<std::string> txn_command;
- std::vector<std::string> txn_key;
- std::vector<std::string> txn_value;
-
- // Request state if primary_id==replica_id, pre_prepare state otherwise
- std::chrono::system_clock::time_point request_pre_prepare_state_time;
- std::chrono::system_clock::time_point prepare_state_time;
- std::vector<std::chrono::system_clock::time_point>
- prepare_message_count_times_list;
- std::chrono::system_clock::time_point commit_state_time;
- std::vector<std::chrono::system_clock::time_point>
- commit_message_count_times_list;
- std::chrono::system_clock::time_point execution_time;
-
- // Storage Engine Stats
- double ext_cache_hit_ratio_;
- std::string level_db_stats_;
- std::string level_db_approx_mem_size_;
-
- // process stats
- struct rusage process_stats_;
-};
-
-class Stats {
- public:
- static Stats* GetGlobalStats(int sleep_seconds = 5);
-
- void Stop();
-
- void RetrieveProgress();
- void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
- bool faulty_flag);
- void SetPrimaryId(int primary_id);
- void SetStorageEngineMetrics(double ext_cache_hit_ratio,
- std::string level_db_stats,
- std::string level_db_approx_mem_size);
- void RecordStateTime(std::string state);
- void GetTransactionDetails(BatchUserRequest batch_request);
- void SendSummary();
- void CrowRoute();
- bool IsFaulty();
- void ChangePrimary(int primary_id);
-
- // Timeline event recording (disk-based, only when resview enabled)
- void RecordNetworkRecv(uint64_t seq);
- void RecordPrepareRecv(uint64_t seq, int sender_id);
- void RecordCommitRecv(uint64_t seq, int sender_id);
- void RecordExecuteStart(uint64_t seq);
- void RecordExecuteEnd(uint64_t seq);
- void RecordResponseSent(uint64_t seq);
- void CleanupOldTimelineEntries(); // Prevent timeline buffer memory leak
-
- void AddLatency(uint64_t run_time);
-
- void Monitor();
- void MonitorGlobal();
-
- void IncSocketRecv();
-
- void IncClientCall();
-
- void IncClientRequest();
- void IncPropose();
- void IncPrepare();
- void IncCommit();
- void IncPendingExecute();
- void IncExecute();
- void IncExecuteDone();
-
- void BroadCastMsg();
- void SendBroadCastMsg(uint32_t num);
- void SendBroadCastMsgPerRep();
- void SeqFail();
- void IncTotalRequest(uint32_t num);
- void IncTotalGeoRequest(uint32_t num);
- void IncGeoRequest();
-
- void SeqGap(uint64_t seq_gap);
- // Network in->worker
- void ServerCall();
- void ServerProcess();
- void SetPrometheus(const std::string& prometheus_address);
-
- protected:
- Stats(int sleep_time = 5);
- ~Stats();
-
- private:
- std::string monitor_port_ = "default";
- std::string name_;
- std::atomic<int> num_call_, run_call_;
- std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
- std::thread thread_;
- std::atomic<bool> begin_;
- std::atomic<bool> stop_;
- std::condition_variable cv_;
- std::mutex mutex_;
-
- std::thread global_thread_;
- std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
- num_commit_, pending_execute_, execute_, execute_done_;
- std::atomic<uint64_t> client_call_, socket_recv_;
- std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
- send_broad_cast_msg_per_rep_;
- std::atomic<uint64_t> seq_fail_;
- std::atomic<uint64_t> server_call_, server_process_;
- std::atomic<uint64_t> run_req_num_;
- std::atomic<uint64_t> run_req_run_time_;
- std::atomic<uint64_t> seq_gap_;
- std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
- int monitor_sleep_time_ = 5; // default 5s.
-
- std::thread crow_thread_;
- bool enable_resview;
- bool enable_faulty_switch_;
- VisualData transaction_summary_;
- std::atomic<bool> make_faulty_;
- std::atomic<uint64_t> prev_num_prepare_;
- std::atomic<uint64_t> prev_num_commit_;
- nlohmann::json summary_json_;
-
- std::unique_ptr<PrometheusHandler> prometheus_;
-
- std::unique_ptr<leveldb::DB> summary_db_;
- std::mutex summary_db_mutex_;
-
- // In-memory timeline event buffer (per-seq) - batched writes to LevelDB
- std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_;
- std::mutex timeline_buffer_mutex_;
-
- // Timeline directory for per-transaction event logs
- std::string timeline_dir_;
- std::mutex timeline_mutex_;
- void WriteTimelineEvent(uint64_t seq, const std::string& phase, int
sender_id = -1);
-};
-
-} // namespace resdb
+ #pragma once
+
+ #include <crow.h>
+
+ #include <chrono>
+ #include <future>
+ #include <nlohmann/json.hpp>
+
+ #include "boost/asio.hpp"
+ #include "boost/beast.hpp"
+ #include "platform/common/network/tcp_socket.h"
+ #include "platform/proto/resdb.pb.h"
+ #include "platform/statistic/prometheus_handler.h"
+ #include "proto/kv/kv.pb.h"
+ #include "sys/resource.h"
+
+ namespace asio = boost::asio;
+ namespace beast = boost::beast;
+ using tcp = asio::ip::tcp;
+
+ namespace resdb {
+
+ struct VisualData {
+ // Set when initializing
+ int replica_id;
+ int primary_id;
+ std::string ip;
+ int port;
+
+ // Set when new txn is received
+ int txn_number;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
+
+ // Request state if primary_id==replica_id, pre_prepare state otherwise
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
+
+ // Storage Engine Stats
+ double ext_cache_hit_ratio_;
+ std::string level_db_stats_;
+ std::string level_db_approx_mem_size_;
+
+ // process stats
+ struct rusage process_stats_;
+ };
+
+ class Stats {
+ public:
+ static Stats* GetGlobalStats(int sleep_seconds = 5);
+
+ void Stop();
+
+ void RetrieveProgress();
+ void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+ bool faulty_flag);
+ void SetPrimaryId(int primary_id);
+ void SetStorageEngineMetrics(double ext_cache_hit_ratio,
+ std::string level_db_stats,
+ std::string level_db_approx_mem_size);
+ void RecordStateTime(std::string state);
+ void GetTransactionDetails(BatchUserRequest batch_request);
+ void SendSummary();
+ void CrowRoute();
+ bool IsFaulty();
+ void ChangePrimary(int primary_id);
+
+ void AddLatency(uint64_t run_time);
+
+ void Monitor();
+ void MonitorGlobal();
+
+ void IncSocketRecv();
+
+ void IncClientCall();
+
+ void IncClientRequest();
+ void IncPropose();
+ void IncPrepare();
+ void IncCommit();
+ void IncPendingExecute();
+ void IncExecute();
+ void IncExecuteDone();
+
+ void BroadCastMsg();
+ void SendBroadCastMsg(uint32_t num);
+ void SendBroadCastMsgPerRep();
+ void SeqFail();
+ void IncTotalRequest(uint32_t num);
+ void IncTotalGeoRequest(uint32_t num);
+ void IncGeoRequest();
+
+ void SeqGap(uint64_t seq_gap);
+ // Network in->worker
+ void ServerCall();
+ void ServerProcess();
+ void SetPrometheus(const std::string& prometheus_address);
+
+ protected:
+ Stats(int sleep_time = 5);
+ ~Stats();
+
+ private:
+ std::string monitor_port_ = "default";
+ std::string name_;
+ std::atomic<int> num_call_, run_call_;
+ std::atomic<uint64_t> last_time_, run_time_, run_call_time_;
+ std::thread thread_;
+ std::atomic<bool> begin_;
+ std::atomic<bool> stop_;
+ std::condition_variable cv_;
+ std::mutex mutex_;
+
+ std::thread global_thread_;
+ std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
+ num_commit_, pending_execute_, execute_, execute_done_;
+ std::atomic<uint64_t> client_call_, socket_recv_;
+ std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_,
+ send_broad_cast_msg_per_rep_;
+ std::atomic<uint64_t> seq_fail_;
+ std::atomic<uint64_t> server_call_, server_process_;
+ std::atomic<uint64_t> run_req_num_;
+ std::atomic<uint64_t> run_req_run_time_;
+ std::atomic<uint64_t> seq_gap_;
+ std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
+ int monitor_sleep_time_ = 5; // default 5s.
+
+ std::thread crow_thread_;
+ bool enable_resview;
+ bool enable_faulty_switch_;
+ VisualData transaction_summary_;
+ std::atomic<bool> make_faulty_;
+ std::atomic<uint64_t> prev_num_prepare_;
+ std::atomic<uint64_t> prev_num_commit_;
+ nlohmann::json summary_json_;
+ nlohmann::json consensus_history_;
+
+ std::unique_ptr<PrometheusHandler> prometheus_;
+ };
+
+ } // namespace resdb
\ No newline at end of file