This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit a3c1477e3aa84fffb6b6d2e509a5dd40cb498d31 Author: bchou9 <[email protected]> AuthorDate: Thu Jan 8 05:12:34 2026 +0000 reverting stats.cpp to base implemenation. TODO: per request telemetry tracking, currently gets overwritten leading to incorrect telemetry data --- .../consensus/execution/transaction_executor.cpp | 3 - platform/consensus/ordering/pbft/commitment.cpp | 2 - platform/networkstrate/service_network.cpp | 11 - platform/statistic/BUILD | 7 +- platform/statistic/stats.cpp | 1531 ++++++++------------ platform/statistic/stats.h | 322 ++-- 6 files changed, 735 insertions(+), 1141 deletions(-) diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 71c228a0..bb7cf6be 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -303,7 +303,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, std::unique_ptr<BatchUserResponse> response; global_stats_->GetTransactionDetails(*batch_request_p); uint64_t seq = request->seq(); - global_stats_->RecordExecuteStart(seq); if (transaction_manager_ && need_execute) { if (execute_thread_num_ == 1) { response = transaction_manager_->ExecuteBatchWithSeq(request->seq(), @@ -356,7 +355,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, if (response == nullptr) { response = std::make_unique<BatchUserResponse>(); } - global_stats_->RecordExecuteEnd(seq); global_stats_->IncTotalRequest(batch_request_p->user_requests_size()); response->set_proxy_id(batch_request_p->proxy_id()); response->set_createtime(batch_request_p->createtime() + @@ -366,7 +364,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, response->set_seq(request->seq()); if (post_exec_func_) { post_exec_func_(std::move(request), std::move(response)); - global_stats_->RecordResponseSent(seq); } global_stats_->IncExecuteDone(); diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index f8ab3d9d..5a970dd6 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -266,7 +266,6 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, global_stats_->IncPrepare(); uint64_t seq = request->seq(); int sender_id = request->sender_id(); - global_stats_->RecordPrepareRecv(seq, sender_id); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); commit_request->mutable_data_signature()->Clear(); @@ -311,7 +310,6 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, std::move(request)); } global_stats_->IncCommit(); - global_stats_->RecordCommitRecv(seq, sender_id); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will // commit the request. diff --git a/platform/networkstrate/service_network.cpp b/platform/networkstrate/service_network.cpp index 0bc60579..3a05afda 100644 --- a/platform/networkstrate/service_network.cpp +++ b/platform/networkstrate/service_network.cpp @@ -74,17 +74,6 @@ void ServiceNetwork::AcceptorHandler(const char* buffer, size_t data_len) { item->data = std::move(sub_request_info); // LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data // len:"<<item->data->data_len; - - // Try to extract seq from request for timeline tracking - try { - Request request; - if (request.ParseFromArray(sub_data.data(), sub_data.size())) { - global_stats_->RecordNetworkRecv(request.seq()); - } - } catch (...) { - // Ignore parse errors, seq extraction is optional - } - global_stats_->ServerCall(); input_queue_.Push(std::move(item)); } diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 042668e1..81cafe7f 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -26,10 +26,6 @@ cc_library( name = "stats", srcs = ["stats.cpp"], hdrs = ["stats.h"], - copts = select({ - "//chain/storage/setting:enable_leveldb_setting": ["-DENABLE_LEVELDB"], - "//conditions:default": [], - }), deps = [ ":prometheus_handler", "//common:asio", @@ -42,7 +38,6 @@ cc_library( "//proto/kv:kv_cc_proto", "//third_party:crow", "//third_party:prometheus", - "//third_party:leveldb", ], ) @@ -59,4 +54,4 @@ cc_library( cc_binary( name = "set_random_data", srcs = ["set_random_data.cpp"], -) +) \ No newline at end of file diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 5b4e3aec..1c511b3b 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -17,948 +17,589 @@ * under the License. */ -#include "platform/statistic/stats.h" - -#include <glog/logging.h> - -#include <ctime> -#include <sstream> -#include <fstream> -#include <sys/stat.h> -#include <errno.h> - -#include "common/utils/utils.h" -#include "leveldb/db.h" -#include "leveldb/options.h" -#include "leveldb/write_batch.h" -#include "proto/kv/kv.pb.h" - -namespace asio = boost::asio; -namespace beast = boost::beast; -using tcp = asio::ip::tcp; - -namespace resdb { - -std::mutex g_mutex; -Stats* Stats::GetGlobalStats(int seconds) { - std::unique_lock<std::mutex> lk(g_mutex); - static Stats stats(seconds); - return &stats; -} // gets a singelton instance of Stats Class - -Stats::Stats(int sleep_time) { - monitor_sleep_time_ = sleep_time; -#ifdef TEST_MODE - monitor_sleep_time_ = 1; -#endif - num_call_ = 0; - num_commit_ = 0; - run_time_ = 0; - run_call_ = 0; - run_call_time_ = 0; - server_call_ = 0; - server_process_ = 0; - run_req_num_ = 0; - run_req_run_time_ = 0; - seq_gap_ = 0; - total_request_ = 0; - total_geo_request_ = 0; - geo_request_ = 0; - - stop_ = false; - begin_ = false; - - socket_recv_ = 0; - broad_cast_msg_ = 0; - send_broad_cast_msg_ = 0; - - prometheus_ = nullptr; - global_thread_ = - std::thread(&Stats::MonitorGlobal, this); // pass by reference - - transaction_summary_.port = -1; - - // Setup websocket here - make_faulty_.store(false); - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.commit_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.execution_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.txn_number = 0; -} - -void Stats::Stop() { stop_ = true; } - -Stats::~Stats() { - stop_ = true; - if (global_thread_.joinable()) { - global_thread_.join(); - } - if (enable_resview && crow_thread_.joinable()) { - crow_thread_.join(); - } -} - -int64_t GetRSS() { - int64_t rss = 0; - FILE* fp = NULL; - if ((fp = fopen("/proc/self/statm", "r")) == NULL) { - return 0; - } - - uint64_t size, resident, share, text, lib, data, dt; - if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, &text, - &lib, &data, &dt) != 7) { - fclose(fp); - return 0; - } - fclose(fp); - - int64_t page_size = sysconf(_SC_PAGESIZE); - rss = resident * page_size; - - // Convert to MB - rss = rss / (1024 * 1024); - - return rss; -} - -void Stats::CrowRoute() { - crow::SimpleApp app; - while (!stop_) { - try { - //Deprecated - CROW_ROUTE(app, "/consensus_data") - .methods("GET"_method)( - [this](const crow::request& req, crow::response& res) { - LOG(ERROR) << "API 1"; - res.set_header("Access-Control-Allow-Origin", "*"); - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); - res.set_header("Access-Control-Allow-Headers", - "Content-Type, Authorization"); - - int limit = 10; - int page = 1; - - std::string url = req.url; - size_t query_pos = url.find('?'); - if (query_pos != std::string::npos) { - std::string query_string = url.substr(query_pos + 1); - std::istringstream iss(query_string); - std::string param; - while (std::getline(iss, param, '&')) { - size_t eq_pos = param.find('='); - if (eq_pos != std::string::npos) { - std::string key = param.substr(0, eq_pos); - std::string value = param.substr(eq_pos + 1); - if (key == "limit") { - try { - limit = std::stoi(value); - if (limit <= 0) limit = 10; - } catch (...) { - limit = 10; - } - } else if (key == "page") { - try { - page = std::stoi(value); - if (page <= 0) page = 1; - } catch (...) { - page = 1; - } - } - } - } - } - - nlohmann::json result; - int total_count = 0; - int offset = (page - 1) * limit; - int current_index = 0; - int items_collected = 0; - - if (summary_db_) { - std::lock_guard<std::mutex> lock(summary_db_mutex_); - leveldb::Iterator* it = - summary_db_->NewIterator(leveldb::ReadOptions()); - - // Count only non-timeline entries - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::string key = it->key().ToString(); - if (key.find("timeline_") != 0) { - total_count++; - } - } - - // Iterate and collect only non-timeline entries - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::string key = it->key().ToString(); - // Skip timeline entries - if (key.find("timeline_") == 0) { - continue; - } - - if (current_index >= offset && items_collected < limit) { - try { - result[key] = - nlohmann::json::parse(it->value().ToString()); - items_collected++; - } catch (...) { - res.code = 500; - result["error"] = "Failed to parse transaction data"; - delete it; - res.body = result.dump(); - res.end(); - return; - } - } - current_index++; - } - delete it; - } - - res.body = result.dump(); - res.end(); - }); - CROW_ROUTE(app, "/get_status") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 2"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body = IsFaulty() ? "Faulty" : "Not Faulty"; - res.end(); - }); - CROW_ROUTE(app, "/make_faulty") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 3"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - res.body = "Not Enabled"; - // Send your response - if (enable_faulty_switch_) { - make_faulty_.store(!make_faulty_.load()); - res.body = "Success"; - } - res.end(); - }); - CROW_ROUTE(app, "/transaction_data") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res) { - LOG(ERROR) << "API 4"; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - nlohmann::json mem_view_json; - int status = - getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_); - if (status == 0) { - mem_view_json["resident_set_size"] = GetRSS(); - mem_view_json["max_resident_set_size"] = - transaction_summary_.process_stats_.ru_maxrss; - mem_view_json["num_reads"] = - transaction_summary_.process_stats_.ru_inblock; - mem_view_json["num_writes"] = - transaction_summary_.process_stats_.ru_oublock; - } - - mem_view_json["ext_cache_hit_ratio"] = - transaction_summary_.ext_cache_hit_ratio_; - mem_view_json["level_db_stats"] = - transaction_summary_.level_db_stats_; - mem_view_json["level_db_approx_mem_size"] = - transaction_summary_.level_db_approx_mem_size_; - res.body = mem_view_json.dump(); - mem_view_json.clear(); - res.end(); - }); - CROW_ROUTE(app, "/consensus_data/<int>") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res, int txn_number) { - LOG(ERROR) << "API 5: Get transaction " << txn_number; - res.set_header("Access-Control-Allow-Origin", - "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header( - "Access-Control-Allow-Headers", - "Content-Type, Authorization"); // Specify allowed headers - - nlohmann::json result; - if (summary_db_) { - std::lock_guard<std::mutex> lock(summary_db_mutex_); - std::string txn_key = std::to_string(txn_number); - std::string value; - leveldb::Status status = - summary_db_->Get(leveldb::ReadOptions(), txn_key, &value); - - if (status.ok() && !value.empty()) { - try { - result[txn_key] = nlohmann::json::parse(value); - } catch (...) { - res.code = 500; - result["error"] = "Failed to parse transaction data"; - } - } else { - res.code = 404; - result["error"] = "Transaction not found"; - result["txn_number"] = txn_number; - } - } else { - res.code = 503; - result["error"] = "Storage not initialized"; - } - - res.body = result.dump(); - res.end(); - }); - - CROW_ROUTE(app, "/transaction_timeline/<int>") - .methods("GET"_method)([this](const crow::request& req, - crow::response& res, int txn_number) { - LOG(ERROR) << "API 6: Get transaction timeline " << txn_number; - res.set_header("Access-Control-Allow-Origin", "*"); - res.set_header("Access-Control-Allow-Methods", - "GET, POST, OPTIONS"); - res.set_header("Access-Control-Allow-Headers", - "Content-Type, Authorization"); - - nlohmann::json result; - result["txn_number"] = txn_number; - - if (!summary_db_) { - res.code = 503; - result["error"] = "Storage not initialized"; - res.body = result.dump(); - res.end(); - return; - } - - std::lock_guard<std::mutex> lock(summary_db_mutex_); - std::string txn_key = std::to_string(txn_number); - std::string timeline_key = "timeline_" + txn_key; - - std::string txn_value; - leveldb::Status status = - summary_db_->Get(leveldb::ReadOptions(), txn_key, &txn_value); - - if (status.ok() && !txn_value.empty()) { - try { - nlohmann::json txn_summary = nlohmann::json::parse(txn_value); - nlohmann::json txn_details; - if (txn_summary.contains("txn_commands")) { - txn_details["txn_commands"] = txn_summary["txn_commands"]; - } - if (txn_summary.contains("txn_keys")) { - txn_details["txn_keys"] = txn_summary["txn_keys"]; - } - if (txn_summary.contains("txn_values")) { - txn_details["txn_values"] = txn_summary["txn_values"]; - } - if (txn_summary.contains("propose_pre_prepare_time")) { - txn_details["propose_pre_prepare_time"] = - txn_summary["propose_pre_prepare_time"]; - } - if (txn_summary.contains("prepare_time")) { - txn_details["prepare_time"] = txn_summary["prepare_time"]; - } - if (txn_summary.contains("commit_time")) { - txn_details["commit_time"] = txn_summary["commit_time"]; - } - if (txn_summary.contains("execution_time")) { - txn_details["execution_time"] = txn_summary["execution_time"]; - } - result["transaction_details"] = txn_details; - } catch (...) { - LOG(ERROR) << "Failed to parse transaction summary for txn: " - << txn_number; - } - } - - std::string timeline_value; - status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, &timeline_value); - - if (status.ok() && !timeline_value.empty()) { - try { - nlohmann::json events_array = nlohmann::json::parse(timeline_value); - result["timeline"] = events_array; - } catch (...) { - LOG(ERROR) << "Failed to parse timeline for txn: " << txn_number; - } - } else { - LOG(ERROR) << "Timeline not found for txn: " << txn_number; - } - - res.body = result.dump(); - res.end(); - }); - app.port(8500 + transaction_summary_.port).multithreaded().run(); - sleep(1); - } catch (const std::exception& e) { - } - } - app.stop(); -} - -bool Stats::IsFaulty() { return make_faulty_.load(); } - -void Stats::ChangePrimary(int primary_id) { - transaction_summary_.primary_id = primary_id; - make_faulty_.store(false); -} - -void Stats::SetProps(int replica_id, std::string ip, int port, - bool resview_flag, bool faulty_flag) { - transaction_summary_.replica_id = replica_id; - transaction_summary_.ip = ip; - transaction_summary_.port = port; - enable_resview = resview_flag; - enable_faulty_switch_ = faulty_flag; - if (resview_flag) { - // Single data directory for both summaries and timeline - std::string data_dir = "./resdb_data_" + std::to_string(port); - int mkdir_result = mkdir(data_dir.c_str(), 0755); - if (mkdir_result != 0 && errno != EEXIST) { - LOG(ERROR) << "Failed to create data directory: " << data_dir; - } - - // Initialize LevelDB for both transaction summaries and timeline events - std::string summary_path = data_dir + "/summaries"; - leveldb::Options options; - options.create_if_missing = true; - leveldb::DB* db = nullptr; - leveldb::Status status = leveldb::DB::Open(options, summary_path, &db); - if (status.ok()) { - summary_db_.reset(db); - LOG(INFO) << "Initialized LevelDB storage at: " << summary_path; - } else { - LOG(ERROR) << "Failed to open LevelDB at " << summary_path << ": " - << status.ToString(); - } - - crow_thread_ = std::thread(&Stats::CrowRoute, this); - } -} - -void Stats::SetPrimaryId(int primary_id) { - transaction_summary_.primary_id = primary_id; -} - -void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, - std::string level_db_stats, - std::string level_db_approx_mem_size) { - transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; - transaction_summary_.level_db_stats_ = level_db_stats; - transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size; -} - -void Stats::RecordStateTime(std::string state) { - if (!enable_resview) { - return; - } - if (state == "request" || state == "pre-prepare") { - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::now(); - } else if (state == "prepare") { - transaction_summary_.prepare_state_time = std::chrono::system_clock::now(); - } else if (state == "commit") { - transaction_summary_.commit_state_time = std::chrono::system_clock::now(); - } -} - -void Stats::GetTransactionDetails(BatchUserRequest batch_request) { - if (!enable_resview) { - return; - } - transaction_summary_.txn_number = batch_request.seq(); - transaction_summary_.txn_command.clear(); - transaction_summary_.txn_key.clear(); - transaction_summary_.txn_value.clear(); - for (auto& sub_request : batch_request.user_requests()) { - KVRequest kv_request; - if (!kv_request.ParseFromString(sub_request.request().data())) { - break; - } - if (kv_request.cmd() == KVRequest::SET) { - transaction_summary_.txn_command.push_back("SET"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(kv_request.value()); - } else if (kv_request.cmd() == KVRequest::GET) { - transaction_summary_.txn_command.push_back("GET"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(""); - } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { - transaction_summary_.txn_command.push_back("GETALLVALUES"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(""); - } else if (kv_request.cmd() == KVRequest::GETRANGE) { - transaction_summary_.txn_command.push_back("GETRANGE"); - transaction_summary_.txn_key.push_back(kv_request.key()); - transaction_summary_.txn_value.push_back(kv_request.value()); - } - } -} - -void Stats::SendSummary() { - if (!enable_resview) { - return; - } - transaction_summary_.execution_time = std::chrono::system_clock::now(); - - // Convert Transaction Summary to JSON - summary_json_["replica_id"] = transaction_summary_.replica_id; - summary_json_["ip"] = transaction_summary_.ip; - summary_json_["port"] = transaction_summary_.port; - summary_json_["primary_id"] = transaction_summary_.primary_id; - summary_json_["propose_pre_prepare_time"] = - transaction_summary_.request_pre_prepare_state_time.time_since_epoch() - .count(); - summary_json_["prepare_time"] = - transaction_summary_.prepare_state_time.time_since_epoch().count(); - summary_json_["commit_time"] = - transaction_summary_.commit_state_time.time_since_epoch().count(); - summary_json_["execution_time"] = - transaction_summary_.execution_time.time_since_epoch().count(); - for (size_t i = 0; - i < transaction_summary_.prepare_message_count_times_list.size(); i++) { - summary_json_["prepare_message_timestamps"].push_back( - transaction_summary_.prepare_message_count_times_list[i] - .time_since_epoch() - .count()); - } - for (size_t i = 0; - i < transaction_summary_.commit_message_count_times_list.size(); i++) { - summary_json_["commit_message_timestamps"].push_back( - transaction_summary_.commit_message_count_times_list[i] - .time_since_epoch() - .count()); - } - summary_json_["txn_number"] = transaction_summary_.txn_number; - for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) { - summary_json_["txn_commands"].push_back( - transaction_summary_.txn_command[i]); - } - for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) { - summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]); - } - for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) { - summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]); - } - - summary_json_["ext_cache_hit_ratio"] = - transaction_summary_.ext_cache_hit_ratio_; - - std::string txn_key = std::to_string(transaction_summary_.txn_number); - - if (summary_db_) { - std::lock_guard<std::mutex> lock(summary_db_mutex_); - - // Write transaction summary - leveldb::Status status = summary_db_->Put(leveldb::WriteOptions(), txn_key, - summary_json_.dump()); - if (!status.ok()) { - LOG(ERROR) << "Failed to write summary to storage for txn: " << txn_key - << ": " << status.ToString(); - } - - // Batch write all buffered timeline events for this transaction - { - std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_); - auto it = timeline_buffer_.find(transaction_summary_.txn_number); - if (it != timeline_buffer_.end() && !it->second.empty()) { - std::string timeline_key = "timeline_" + txn_key; - - // Read existing timeline events if they exist, then merge - nlohmann::json events_array; - std::string existing_value; - leveldb::Status read_status = summary_db_->Get(leveldb::ReadOptions(), timeline_key, &existing_value); - if (read_status.ok() && !existing_value.empty()) { - try { - events_array = nlohmann::json::parse(existing_value); - // Ensure it's an array - if (!events_array.is_array()) { - events_array = nlohmann::json::array(); - } - } catch (...) { - // If parsing fails, start with empty array - events_array = nlohmann::json::array(); - } - } else { - events_array = nlohmann::json::array(); - } - - // Append new events to existing array - for (const auto& event : it->second) { - events_array.push_back(event); - } - - // Write merged timeline events back to LevelDB - status = summary_db_->Put(leveldb::WriteOptions(), timeline_key, - events_array.dump()); - if (!status.ok()) { - LOG(ERROR) << "Failed to write timeline to storage for txn: " << txn_key - << ": " << status.ToString(); - } - // Clean up buffer for this txn - timeline_buffer_.erase(it); - } - } - } - - LOG(ERROR) << summary_json_.dump(); - - // Reset Transaction Summary Parameters - transaction_summary_.request_pre_prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.commit_state_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.execution_time = - std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_message_count_times_list.clear(); - transaction_summary_.commit_message_count_times_list.clear(); - - summary_json_.clear(); -} - -void Stats::WriteTimelineEvent(uint64_t seq, const std::string& phase, int sender_id) { - if (!enable_resview) { - return; - } - - // Buffer timeline events in memory - written to LevelDB in batch during SendSummary() - std::lock_guard<std::mutex> lock(timeline_buffer_mutex_); - - // Create new event - nlohmann::json event; - event["timestamp"] = std::chrono::system_clock::now().time_since_epoch().count(); - event["phase"] = phase; - if (sender_id >= 0) { - event["sender_id"] = sender_id; - } - - // Append to in-memory buffer for this seq - timeline_buffer_[seq].push_back(event); -} - -void Stats::RecordNetworkRecv(uint64_t seq) { - WriteTimelineEvent(seq, "network_recv"); -} - -void Stats::RecordPrepareRecv(uint64_t seq, int sender_id) { - WriteTimelineEvent(seq, "prepare_recv", sender_id); -} - -void Stats::RecordCommitRecv(uint64_t seq, int sender_id) { - WriteTimelineEvent(seq, "commit_recv", sender_id); -} - -void Stats::RecordExecuteStart(uint64_t seq) { - WriteTimelineEvent(seq, "execute_start"); -} - -void Stats::RecordExecuteEnd(uint64_t seq) { - WriteTimelineEvent(seq, "execute_end"); -} - -void Stats::RecordResponseSent(uint64_t seq) { - WriteTimelineEvent(seq, "response_sent"); -} - -void Stats::CleanupOldTimelineEntries() { - // Periodically clean up old buffer entries to prevent memory leaks - // Keep only entries within the last 10000 sequence numbers - std::lock_guard<std::mutex> timeline_lock(timeline_buffer_mutex_); - - if (timeline_buffer_.empty()) { - return; - } - - // Get the highest sequence number we've seen - uint64_t max_seq = timeline_buffer_.rbegin()->first; - uint64_t cleanup_threshold = (max_seq > 10000) ? (max_seq - 10000) : 0; - - // Remove all entries below the threshold - auto it = timeline_buffer_.begin(); - while (it != timeline_buffer_.end() && it->first < cleanup_threshold) { - it = timeline_buffer_.erase(it); - } - - size_t buffer_size = timeline_buffer_.size(); - if (buffer_size > 1000) { - LOG(WARNING) << "Timeline buffer size is large: " << buffer_size - << " entries. This may indicate transactions not completing."; - } -} - -void Stats::MonitorGlobal() { - LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_; - - uint64_t seq_fail = 0; - uint64_t client_call = 0, socket_recv = 0; - uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0, - pending_execute = 0, execute = 0, execute_done = 0; - uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0; - uint64_t send_broad_cast_msg_per_rep = 0; - uint64_t server_call = 0, server_process = 0; - uint64_t seq_gap = 0; - uint64_t total_request = 0, total_geo_request = 0, geo_request = 0; - - // ====== for client proxy ====== - uint64_t run_req_num = 0, run_req_run_time = 0; - - uint64_t last_run_req_num = 0, last_run_req_run_time = 0; - // ============================= - - uint64_t last_seq_fail = 0; - uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0, - last_num_commit = 0; - uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0; - uint64_t last_client_call = 0, last_socket_recv = 0; - uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0; - uint64_t last_send_broad_cast_msg_per_rep = 0; - uint64_t last_server_call = 0, last_server_process = 0; - uint64_t last_total_request = 0, last_total_geo_request = 0, - last_geo_request = 0; - uint64_t time = 0; - - while (!stop_) { - sleep(monitor_sleep_time_); - time += monitor_sleep_time_; - - // Periodically clean up old timeline buffer entries to prevent memory leaks - CleanupOldTimelineEntries(); - - seq_fail = seq_fail_; - socket_recv = socket_recv_; - client_call = client_call_; - num_client_req = num_client_req_; - num_propose = num_propose_; - num_prepare = num_prepare_; - num_commit = num_commit_; - pending_execute = pending_execute_; - execute = execute_; - execute_done = execute_done_; - broad_cast_msg = broad_cast_msg_; - send_broad_cast_msg = send_broad_cast_msg_; - send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_; - server_call = server_call_; - server_process = server_process_; - seq_gap = seq_gap_; - total_request = total_request_; - total_geo_request = total_geo_request_; - geo_request = geo_request_; - - run_req_num = run_req_num_; - run_req_run_time = run_req_run_time_; - - LOG(ERROR) << "=========== monitor =========\n" - << "server call:" << server_call - last_server_call - << " server process:" << server_process - last_server_process - << " socket recv:" << socket_recv - last_socket_recv - << " " - "client call:" - << client_call - last_client_call - << " " - "client req:" - << num_client_req - last_num_client_req - << " " - "broad_cast:" - << broad_cast_msg - last_broad_cast_msg - << " " - "send broad_cast:" - << send_broad_cast_msg - last_send_broad_cast_msg - << " " - "per send broad_cast:" - << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep - << " " - "propose:" - << num_propose - last_num_propose - << " " - "prepare:" - << (num_prepare - last_num_prepare) - << " " - "commit:" - << (num_commit - last_num_commit) - << " " - "pending execute:" - << pending_execute - last_pending_execute - << " " - "execute:" - << execute - last_execute - << " " - "execute done:" - << execute_done - last_execute_done << " seq gap:" << seq_gap - << " total request:" << total_request - last_total_request - << " txn:" << (total_request - last_total_request) / 5 - << " total geo request:" - << total_geo_request - last_total_geo_request - << " total geo request per:" - << (total_geo_request - last_total_geo_request) / 5 - << " geo request:" << (geo_request - last_geo_request) - << " " - "seq fail:" - << seq_fail - last_seq_fail << " time:" << time - << " " - "\n--------------- monitor ------------"; - if (run_req_num - last_run_req_num > 0) { - LOG(ERROR) << " req client latency:" - << static_cast<double>(run_req_run_time - - last_run_req_run_time) / - (run_req_num - last_run_req_num) / 1000000000.0; - } - - last_seq_fail = seq_fail; - last_socket_recv = socket_recv; - last_client_call = client_call; - last_num_client_req = num_client_req; - last_num_propose = num_propose; - last_num_prepare = num_prepare; - last_num_commit = num_commit; - last_pending_execute = pending_execute; - last_execute = execute; - last_execute_done = execute_done; - - last_broad_cast_msg = broad_cast_msg; - last_send_broad_cast_msg = send_broad_cast_msg; - last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep; - - last_server_call = server_call; - last_server_process = server_process; - - last_run_req_num = run_req_num; - last_run_req_run_time = run_req_run_time; - last_total_request = total_request; - last_total_geo_request = total_geo_request; - last_geo_request = geo_request; - } -} - -void Stats::IncClientCall() { - if (prometheus_) { - prometheus_->Inc(CLIENT_CALL, 1); - } - client_call_++; -} - -void Stats::IncClientRequest() { - if (prometheus_) { - prometheus_->Inc(CLIENT_REQ, 1); - } - num_client_req_++; -} - -void Stats::IncPropose() { - if (prometheus_) { - prometheus_->Inc(PROPOSE, 1); - } - num_propose_++; -} - -void Stats::IncPrepare() { - if (prometheus_) { - prometheus_->Inc(PREPARE, 1); - } - num_prepare_++; - if (enable_resview) { - transaction_summary_.prepare_message_count_times_list.push_back( - std::chrono::system_clock::now()); - } -} - -void Stats::IncCommit() { - if (prometheus_) { - prometheus_->Inc(COMMIT, 1); - } - num_commit_++; - if (enable_resview) { - transaction_summary_.commit_message_count_times_list.push_back( - std::chrono::system_clock::now()); - } -} - -void Stats::IncPendingExecute() { pending_execute_++; } - -void Stats::IncExecute() { execute_++; } - -void Stats::IncExecuteDone() { - if (prometheus_) { - prometheus_->Inc(EXECUTE, 1); - } - execute_done_++; -} - -void Stats::BroadCastMsg() { - if (prometheus_) { - prometheus_->Inc(BROAD_CAST, 1); - } - broad_cast_msg_++; -} - -void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; } - -void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; } - -void Stats::SeqFail() { seq_fail_++; } - -void Stats::IncTotalRequest(uint32_t num) { - if (prometheus_) { - prometheus_->Inc(NUM_EXECUTE_TX, num); - } - total_request_ += num; -} - -void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; } - -void Stats::IncGeoRequest() { geo_request_++; } - -void Stats::ServerCall() { - if (prometheus_) { - prometheus_->Inc(SERVER_CALL_NAME, 1); - } - server_call_++; -} - -void Stats::ServerProcess() { - if (prometheus_) { - prometheus_->Inc(SERVER_PROCESS, 1); - } - server_process_++; -} - -void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; } - -void Stats::AddLatency(uint64_t run_time) { - run_req_num_++; - run_req_run_time_ += run_time; -} - -void Stats::SetPrometheus(const std::string& prometheus_address) { - prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address); -} - -} // namespace resdb + #include "platform/statistic/stats.h" + + #include <glog/logging.h> + + #include <ctime> + + #include "common/utils/utils.h" + #include "proto/kv/kv.pb.h" + + namespace asio = boost::asio; + namespace beast = boost::beast; + using tcp = asio::ip::tcp; + + namespace resdb { + + std::mutex g_mutex; + Stats* Stats::GetGlobalStats(int seconds) { + std::unique_lock<std::mutex> lk(g_mutex); + static Stats stats(seconds); + return &stats; + } // gets a singelton instance of Stats Class + + Stats::Stats(int sleep_time) { + monitor_sleep_time_ = sleep_time; + #ifdef TEST_MODE + monitor_sleep_time_ = 1; + #endif + num_call_ = 0; + num_commit_ = 0; + run_time_ = 0; + run_call_ = 0; + run_call_time_ = 0; + server_call_ = 0; + server_process_ = 0; + run_req_num_ = 0; + run_req_run_time_ = 0; + seq_gap_ = 0; + total_request_ = 0; + total_geo_request_ = 0; + geo_request_ = 0; + + stop_ = false; + begin_ = false; + + socket_recv_ = 0; + broad_cast_msg_ = 0; + send_broad_cast_msg_ = 0; + + prometheus_ = nullptr; + global_thread_ = + std::thread(&Stats::MonitorGlobal, this); // pass by reference + + transaction_summary_.port = -1; + + // Setup websocket here + make_faulty_.store(false); + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.commit_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.execution_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.txn_number = 0; + } + + void Stats::Stop() { stop_ = true; } + + Stats::~Stats() { + stop_ = true; + if (global_thread_.joinable()) { + global_thread_.join(); + } + if (enable_resview && crow_thread_.joinable()) { + crow_thread_.join(); + } + } + + int64_t GetRSS() { + int64_t rss = 0; + FILE* fp = NULL; + if ((fp = fopen("/proc/self/statm", "r")) == NULL) { + return 0; + } + + uint64_t size, resident, share, text, lib, data, dt; + if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, &text, + &lib, &data, &dt) != 7) { + fclose(fp); + return 0; + } + fclose(fp); + + int64_t page_size = sysconf(_SC_PAGESIZE); + rss = resident * page_size; + + // Convert to MB + rss = rss / (1024 * 1024); + + return rss; + } + + void Stats::CrowRoute() { + crow::SimpleApp app; + while (!stop_) { + try { + CROW_ROUTE(app, "/consensus_data") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 1"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body = consensus_history_.dump(); + res.end(); + }); + CROW_ROUTE(app, "/get_status") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 2"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body = IsFaulty() ? "Faulty" : "Not Faulty"; + res.end(); + }); + CROW_ROUTE(app, "/make_faulty") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 3"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + if (enable_faulty_switch_) { + make_faulty_.store(!make_faulty_.load()); + } + res.body = "Success"; + res.end(); + }); + CROW_ROUTE(app, "/transaction_data") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 4"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + nlohmann::json mem_view_json; + int status = + getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_); + if (status == 0) { + mem_view_json["resident_set_size"] = GetRSS(); + mem_view_json["max_resident_set_size"] = + transaction_summary_.process_stats_.ru_maxrss; + mem_view_json["num_reads"] = + transaction_summary_.process_stats_.ru_inblock; + mem_view_json["num_writes"] = + transaction_summary_.process_stats_.ru_oublock; + } + + mem_view_json["ext_cache_hit_ratio"] = + transaction_summary_.ext_cache_hit_ratio_; + mem_view_json["level_db_stats"] = + transaction_summary_.level_db_stats_; + mem_view_json["level_db_approx_mem_size"] = + transaction_summary_.level_db_approx_mem_size_; + res.body = mem_view_json.dump(); + mem_view_json.clear(); + res.end(); + }); + app.port(8500 + transaction_summary_.port).multithreaded().run(); + sleep(1); + } catch (const std::exception& e) { + } + } + app.stop(); + } + + bool Stats::IsFaulty() { return make_faulty_.load(); } + + void Stats::ChangePrimary(int primary_id) { + transaction_summary_.primary_id = primary_id; + make_faulty_.store(false); + } + + void Stats::SetProps(int replica_id, std::string ip, int port, + bool resview_flag, bool faulty_flag) { + transaction_summary_.replica_id = replica_id; + transaction_summary_.ip = ip; + transaction_summary_.port = port; + enable_resview = resview_flag; + enable_faulty_switch_ = faulty_flag; + if (resview_flag) { + crow_thread_ = std::thread(&Stats::CrowRoute, this); + } + } + + void Stats::SetPrimaryId(int primary_id) { + transaction_summary_.primary_id = primary_id; + } + + void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, + std::string level_db_stats, + std::string level_db_approx_mem_size) { + transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; + transaction_summary_.level_db_stats_ = level_db_stats; + transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size; + } + + void Stats::RecordStateTime(std::string state) { + if (!enable_resview) { + return; + } + if (state == "request" || state == "pre-prepare") { + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::now(); + } else if (state == "prepare") { + transaction_summary_.prepare_state_time = std::chrono::system_clock::now(); + } else if (state == "commit") { + transaction_summary_.commit_state_time = std::chrono::system_clock::now(); + } + } + + void Stats::GetTransactionDetails(BatchUserRequest batch_request) { + if (!enable_resview) { + return; + } + transaction_summary_.txn_number = batch_request.seq(); + transaction_summary_.txn_command.clear(); + transaction_summary_.txn_key.clear(); + transaction_summary_.txn_value.clear(); + for (auto& sub_request : batch_request.user_requests()) { + KVRequest kv_request; + if (!kv_request.ParseFromString(sub_request.request().data())) { + break; + } + if (kv_request.cmd() == KVRequest::SET) { + transaction_summary_.txn_command.push_back("SET"); + transaction_summary_.txn_key.push_back(kv_request.key()); + transaction_summary_.txn_value.push_back(kv_request.value()); + } else if (kv_request.cmd() == KVRequest::GET) { + transaction_summary_.txn_command.push_back("GET"); + transaction_summary_.txn_key.push_back(kv_request.key()); + transaction_summary_.txn_value.push_back(""); + } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { + transaction_summary_.txn_command.push_back("GETALLVALUES"); + transaction_summary_.txn_key.push_back(kv_request.key()); + transaction_summary_.txn_value.push_back(""); + } else if (kv_request.cmd() == KVRequest::GETRANGE) { + transaction_summary_.txn_command.push_back("GETRANGE"); + transaction_summary_.txn_key.push_back(kv_request.key()); + transaction_summary_.txn_value.push_back(kv_request.value()); + } + } + } + + void Stats::SendSummary() { + if (!enable_resview) { + return; + } + transaction_summary_.execution_time = std::chrono::system_clock::now(); + + // Convert Transaction Summary to JSON + summary_json_["replica_id"] = transaction_summary_.replica_id; + summary_json_["ip"] = transaction_summary_.ip; + summary_json_["port"] = transaction_summary_.port; + summary_json_["primary_id"] = transaction_summary_.primary_id; + summary_json_["propose_pre_prepare_time"] = + transaction_summary_.request_pre_prepare_state_time.time_since_epoch() + .count(); + summary_json_["prepare_time"] = + transaction_summary_.prepare_state_time.time_since_epoch().count(); + summary_json_["commit_time"] = + transaction_summary_.commit_state_time.time_since_epoch().count(); + summary_json_["execution_time"] = + transaction_summary_.execution_time.time_since_epoch().count(); + for (size_t i = 0; + i < transaction_summary_.prepare_message_count_times_list.size(); i++) { + summary_json_["prepare_message_timestamps"].push_back( + transaction_summary_.prepare_message_count_times_list[i] + .time_since_epoch() + .count()); + } + for (size_t i = 0; + i < transaction_summary_.commit_message_count_times_list.size(); i++) { + summary_json_["commit_message_timestamps"].push_back( + transaction_summary_.commit_message_count_times_list[i] + .time_since_epoch() + .count()); + } + summary_json_["txn_number"] = transaction_summary_.txn_number; + for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) { + summary_json_["txn_commands"].push_back( + transaction_summary_.txn_command[i]); + } + for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) { + summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]); + } + for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) { + summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]); + } + + summary_json_["ext_cache_hit_ratio"] = + transaction_summary_.ext_cache_hit_ratio_; + consensus_history_[std::to_string(transaction_summary_.txn_number)] = + summary_json_; + + LOG(ERROR) << summary_json_.dump(); + + // Reset Transaction Summary Parameters + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.commit_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.execution_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_message_count_times_list.clear(); + transaction_summary_.commit_message_count_times_list.clear(); + + summary_json_.clear(); + } + + void Stats::MonitorGlobal() { + LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_; + + uint64_t seq_fail = 0; + uint64_t client_call = 0, socket_recv = 0; + uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0, + pending_execute = 0, execute = 0, execute_done = 0; + uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0; + uint64_t send_broad_cast_msg_per_rep = 0; + uint64_t server_call = 0, server_process = 0; + uint64_t seq_gap = 0; + uint64_t total_request = 0, total_geo_request = 0, geo_request = 0; + + // ====== for client proxy ====== + uint64_t run_req_num = 0, run_req_run_time = 0; + + uint64_t last_run_req_num = 0, last_run_req_run_time = 0; + // ============================= + + uint64_t last_seq_fail = 0; + uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0, + last_num_commit = 0; + uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0; + uint64_t last_client_call = 0, last_socket_recv = 0; + uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0; + uint64_t last_send_broad_cast_msg_per_rep = 0; + uint64_t last_server_call = 0, last_server_process = 0; + uint64_t last_total_request = 0, last_total_geo_request = 0, + last_geo_request = 0; + uint64_t time = 0; + + while (!stop_) { + sleep(monitor_sleep_time_); + time += monitor_sleep_time_; + seq_fail = seq_fail_; + socket_recv = socket_recv_; + client_call = client_call_; + num_client_req = num_client_req_; + num_propose = num_propose_; + num_prepare = num_prepare_; + num_commit = num_commit_; + pending_execute = pending_execute_; + execute = execute_; + execute_done = execute_done_; + broad_cast_msg = broad_cast_msg_; + send_broad_cast_msg = send_broad_cast_msg_; + send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_; + server_call = server_call_; + server_process = server_process_; + seq_gap = seq_gap_; + total_request = total_request_; + total_geo_request = total_geo_request_; + geo_request = geo_request_; + + run_req_num = run_req_num_; + run_req_run_time = run_req_run_time_; + + LOG(ERROR) << "=========== monitor =========\n" + << "server call:" << server_call - last_server_call + << " server process:" << server_process - last_server_process + << " socket recv:" << socket_recv - last_socket_recv + << " " + "client call:" + << client_call - last_client_call + << " " + "client req:" + << num_client_req - last_num_client_req + << " " + "broad_cast:" + << broad_cast_msg - last_broad_cast_msg + << " " + "send broad_cast:" + << send_broad_cast_msg - last_send_broad_cast_msg + << " " + "per send broad_cast:" + << send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep + << " " + "propose:" + << num_propose - last_num_propose + << " " + "prepare:" + << (num_prepare - last_num_prepare) + << " " + "commit:" + << (num_commit - last_num_commit) + << " " + "pending execute:" + << pending_execute - last_pending_execute + << " " + "execute:" + << execute - last_execute + << " " + "execute done:" + << execute_done - last_execute_done << " seq gap:" << seq_gap + << " total request:" << total_request - last_total_request + << " txn:" << (total_request - last_total_request) / 5 + << " total geo request:" + << total_geo_request - last_total_geo_request + << " total geo request per:" + << (total_geo_request - last_total_geo_request) / 5 + << " geo request:" << (geo_request - last_geo_request) + << " " + "seq fail:" + << seq_fail - last_seq_fail << " time:" << time + << " " + "\n--------------- monitor ------------"; + if (run_req_num - last_run_req_num > 0) { + LOG(ERROR) << " req client latency:" + << static_cast<double>(run_req_run_time - + last_run_req_run_time) / + (run_req_num - last_run_req_num) / 1000000000.0; + } + + last_seq_fail = seq_fail; + last_socket_recv = socket_recv; + last_client_call = client_call; + last_num_client_req = num_client_req; + last_num_propose = num_propose; + last_num_prepare = num_prepare; + last_num_commit = num_commit; + last_pending_execute = pending_execute; + last_execute = execute; + last_execute_done = execute_done; + + last_broad_cast_msg = broad_cast_msg; + last_send_broad_cast_msg = send_broad_cast_msg; + last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep; + + last_server_call = server_call; + last_server_process = server_process; + + last_run_req_num = run_req_num; + last_run_req_run_time = run_req_run_time; + last_total_request = total_request; + last_total_geo_request = total_geo_request; + last_geo_request = geo_request; + } + } + + void Stats::IncClientCall() { + if (prometheus_) { + prometheus_->Inc(CLIENT_CALL, 1); + } + client_call_++; + } + + void Stats::IncClientRequest() { + if (prometheus_) { + prometheus_->Inc(CLIENT_REQ, 1); + } + num_client_req_++; + } + + void Stats::IncPropose() { + if (prometheus_) { + prometheus_->Inc(PROPOSE, 1); + } + num_propose_++; + } + + void Stats::IncPrepare() { + if (prometheus_) { + prometheus_->Inc(PREPARE, 1); + } + num_prepare_++; + transaction_summary_.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } + + void Stats::IncCommit() { + if (prometheus_) { + prometheus_->Inc(COMMIT, 1); + } + num_commit_++; + transaction_summary_.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); + } + + void Stats::IncPendingExecute() { pending_execute_++; } + + void Stats::IncExecute() { execute_++; } + + void Stats::IncExecuteDone() { + if (prometheus_) { + prometheus_->Inc(EXECUTE, 1); + } + execute_done_++; + } + + void Stats::BroadCastMsg() { + if (prometheus_) { + prometheus_->Inc(BROAD_CAST, 1); + } + broad_cast_msg_++; + } + + void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; } + + void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; } + + void Stats::SeqFail() { seq_fail_++; } + + void Stats::IncTotalRequest(uint32_t num) { + if (prometheus_) { + prometheus_->Inc(NUM_EXECUTE_TX, num); + } + total_request_ += num; + } + + void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; } + + void Stats::IncGeoRequest() { geo_request_++; } + + void Stats::ServerCall() { + if (prometheus_) { + prometheus_->Inc(SERVER_CALL_NAME, 1); + } + server_call_++; + } + + void Stats::ServerProcess() { + if (prometheus_) { + prometheus_->Inc(SERVER_PROCESS, 1); + } + server_process_++; + } + + void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; } + + void Stats::AddLatency(uint64_t run_time) { + run_req_num_++; + run_req_run_time_ += run_time; + } + + void Stats::SetPrometheus(const std::string& prometheus_address) { + prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address); + } + + } // namespace resdb \ No newline at end of file diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index a2524fc1..c0ad30a7 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -17,177 +17,151 @@ * under the License. */ -#pragma once - -#include <crow.h> - -#include <chrono> -#include <future> -#include <map> -#include <nlohmann/json.hpp> - -#include "boost/asio.hpp" -#include "boost/beast.hpp" -#include "platform/common/network/tcp_socket.h" -#include "platform/proto/resdb.pb.h" -#include "platform/statistic/prometheus_handler.h" -#include "proto/kv/kv.pb.h" -#include "sys/resource.h" - -// Forward declaration for LevelDB -namespace leveldb { -class DB; -} - -namespace asio = boost::asio; -namespace beast = boost::beast; -using tcp = asio::ip::tcp; - -namespace resdb { - -struct VisualData { - // Set when initializing - int replica_id; - int primary_id; - std::string ip; - int port; - - // Set when new txn is received - int txn_number; - std::vector<std::string> txn_command; - std::vector<std::string> txn_key; - std::vector<std::string> txn_value; - - // Request state if primary_id==replica_id, pre_prepare state otherwise - std::chrono::system_clock::time_point request_pre_prepare_state_time; - std::chrono::system_clock::time_point prepare_state_time; - std::vector<std::chrono::system_clock::time_point> - prepare_message_count_times_list; - std::chrono::system_clock::time_point commit_state_time; - std::vector<std::chrono::system_clock::time_point> - commit_message_count_times_list; - std::chrono::system_clock::time_point execution_time; - - // Storage Engine Stats - double ext_cache_hit_ratio_; - std::string level_db_stats_; - std::string level_db_approx_mem_size_; - - // process stats - struct rusage process_stats_; -}; - -class Stats { - public: - static Stats* GetGlobalStats(int sleep_seconds = 5); - - void Stop(); - - void RetrieveProgress(); - void SetProps(int replica_id, std::string ip, int port, bool resview_flag, - bool faulty_flag); - void SetPrimaryId(int primary_id); - void SetStorageEngineMetrics(double ext_cache_hit_ratio, - std::string level_db_stats, - std::string level_db_approx_mem_size); - void RecordStateTime(std::string state); - void GetTransactionDetails(BatchUserRequest batch_request); - void SendSummary(); - void CrowRoute(); - bool IsFaulty(); - void ChangePrimary(int primary_id); - - // Timeline event recording (disk-based, only when resview enabled) - void RecordNetworkRecv(uint64_t seq); - void RecordPrepareRecv(uint64_t seq, int sender_id); - void RecordCommitRecv(uint64_t seq, int sender_id); - void RecordExecuteStart(uint64_t seq); - void RecordExecuteEnd(uint64_t seq); - void RecordResponseSent(uint64_t seq); - void CleanupOldTimelineEntries(); // Prevent timeline buffer memory leak - - void AddLatency(uint64_t run_time); - - void Monitor(); - void MonitorGlobal(); - - void IncSocketRecv(); - - void IncClientCall(); - - void IncClientRequest(); - void IncPropose(); - void IncPrepare(); - void IncCommit(); - void IncPendingExecute(); - void IncExecute(); - void IncExecuteDone(); - - void BroadCastMsg(); - void SendBroadCastMsg(uint32_t num); - void SendBroadCastMsgPerRep(); - void SeqFail(); - void IncTotalRequest(uint32_t num); - void IncTotalGeoRequest(uint32_t num); - void IncGeoRequest(); - - void SeqGap(uint64_t seq_gap); - // Network in->worker - void ServerCall(); - void ServerProcess(); - void SetPrometheus(const std::string& prometheus_address); - - protected: - Stats(int sleep_time = 5); - ~Stats(); - - private: - std::string monitor_port_ = "default"; - std::string name_; - std::atomic<int> num_call_, run_call_; - std::atomic<uint64_t> last_time_, run_time_, run_call_time_; - std::thread thread_; - std::atomic<bool> begin_; - std::atomic<bool> stop_; - std::condition_variable cv_; - std::mutex mutex_; - - std::thread global_thread_; - std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, - num_commit_, pending_execute_, execute_, execute_done_; - std::atomic<uint64_t> client_call_, socket_recv_; - std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_, - send_broad_cast_msg_per_rep_; - std::atomic<uint64_t> seq_fail_; - std::atomic<uint64_t> server_call_, server_process_; - std::atomic<uint64_t> run_req_num_; - std::atomic<uint64_t> run_req_run_time_; - std::atomic<uint64_t> seq_gap_; - std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_; - int monitor_sleep_time_ = 5; // default 5s. - - std::thread crow_thread_; - bool enable_resview; - bool enable_faulty_switch_; - VisualData transaction_summary_; - std::atomic<bool> make_faulty_; - std::atomic<uint64_t> prev_num_prepare_; - std::atomic<uint64_t> prev_num_commit_; - nlohmann::json summary_json_; - - std::unique_ptr<PrometheusHandler> prometheus_; - - std::unique_ptr<leveldb::DB> summary_db_; - std::mutex summary_db_mutex_; - - // In-memory timeline event buffer (per-seq) - batched writes to LevelDB - std::map<uint64_t, std::vector<nlohmann::json>> timeline_buffer_; - std::mutex timeline_buffer_mutex_; - - // Timeline directory for per-transaction event logs - std::string timeline_dir_; - std::mutex timeline_mutex_; - void WriteTimelineEvent(uint64_t seq, const std::string& phase, int sender_id = -1); -}; - -} // namespace resdb + #pragma once + + #include <crow.h> + + #include <chrono> + #include <future> + #include <nlohmann/json.hpp> + + #include "boost/asio.hpp" + #include "boost/beast.hpp" + #include "platform/common/network/tcp_socket.h" + #include "platform/proto/resdb.pb.h" + #include "platform/statistic/prometheus_handler.h" + #include "proto/kv/kv.pb.h" + #include "sys/resource.h" + + namespace asio = boost::asio; + namespace beast = boost::beast; + using tcp = asio::ip::tcp; + + namespace resdb { + + struct VisualData { + // Set when initializing + int replica_id; + int primary_id; + std::string ip; + int port; + + // Set when new txn is received + int txn_number; + std::vector<std::string> txn_command; + std::vector<std::string> txn_key; + std::vector<std::string> txn_value; + + // Request state if primary_id==replica_id, pre_prepare state otherwise + std::chrono::system_clock::time_point request_pre_prepare_state_time; + std::chrono::system_clock::time_point prepare_state_time; + std::vector<std::chrono::system_clock::time_point> + prepare_message_count_times_list; + std::chrono::system_clock::time_point commit_state_time; + std::vector<std::chrono::system_clock::time_point> + commit_message_count_times_list; + std::chrono::system_clock::time_point execution_time; + + // Storage Engine Stats + double ext_cache_hit_ratio_; + std::string level_db_stats_; + std::string level_db_approx_mem_size_; + + // process stats + struct rusage process_stats_; + }; + + class Stats { + public: + static Stats* GetGlobalStats(int sleep_seconds = 5); + + void Stop(); + + void RetrieveProgress(); + void SetProps(int replica_id, std::string ip, int port, bool resview_flag, + bool faulty_flag); + void SetPrimaryId(int primary_id); + void SetStorageEngineMetrics(double ext_cache_hit_ratio, + std::string level_db_stats, + std::string level_db_approx_mem_size); + void RecordStateTime(std::string state); + void GetTransactionDetails(BatchUserRequest batch_request); + void SendSummary(); + void CrowRoute(); + bool IsFaulty(); + void ChangePrimary(int primary_id); + + void AddLatency(uint64_t run_time); + + void Monitor(); + void MonitorGlobal(); + + void IncSocketRecv(); + + void IncClientCall(); + + void IncClientRequest(); + void IncPropose(); + void IncPrepare(); + void IncCommit(); + void IncPendingExecute(); + void IncExecute(); + void IncExecuteDone(); + + void BroadCastMsg(); + void SendBroadCastMsg(uint32_t num); + void SendBroadCastMsgPerRep(); + void SeqFail(); + void IncTotalRequest(uint32_t num); + void IncTotalGeoRequest(uint32_t num); + void IncGeoRequest(); + + void SeqGap(uint64_t seq_gap); + // Network in->worker + void ServerCall(); + void ServerProcess(); + void SetPrometheus(const std::string& prometheus_address); + + protected: + Stats(int sleep_time = 5); + ~Stats(); + + private: + std::string monitor_port_ = "default"; + std::string name_; + std::atomic<int> num_call_, run_call_; + std::atomic<uint64_t> last_time_, run_time_, run_call_time_; + std::thread thread_; + std::atomic<bool> begin_; + std::atomic<bool> stop_; + std::condition_variable cv_; + std::mutex mutex_; + + std::thread global_thread_; + std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, + num_commit_, pending_execute_, execute_, execute_done_; + std::atomic<uint64_t> client_call_, socket_recv_; + std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_, + send_broad_cast_msg_per_rep_; + std::atomic<uint64_t> seq_fail_; + std::atomic<uint64_t> server_call_, server_process_; + std::atomic<uint64_t> run_req_num_; + std::atomic<uint64_t> run_req_run_time_; + std::atomic<uint64_t> seq_gap_; + std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_; + int monitor_sleep_time_ = 5; // default 5s. + + std::thread crow_thread_; + bool enable_resview; + bool enable_faulty_switch_; + VisualData transaction_summary_; + std::atomic<bool> make_faulty_; + std::atomic<uint64_t> prev_num_prepare_; + std::atomic<uint64_t> prev_num_commit_; + nlohmann::json summary_json_; + nlohmann::json consensus_history_; + + std::unique_ptr<PrometheusHandler> prometheus_; + }; + + } // namespace resdb \ No newline at end of file
