This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 6b0165ce6286d2470f3d1445141add95a1ac8e8a Author: bchou9 <[email protected]> AuthorDate: Thu Jan 8 07:29:47 2026 +0000 added more prometheus metrics --- platform/statistic/prometheus_handler.cpp | 62 ++++++++++++++++- platform/statistic/prometheus_handler.h | 26 +++++++ platform/statistic/stats.cpp | 109 +++++++++++++++++++++++++++--- platform/statistic/stats.h | 2 + 4 files changed, 187 insertions(+), 12 deletions(-) diff --git a/platform/statistic/prometheus_handler.cpp b/platform/statistic/prometheus_handler.cpp index 644f6a18..fbe5886f 100644 --- a/platform/statistic/prometheus_handler.cpp +++ b/platform/statistic/prometheus_handler.cpp @@ -43,7 +43,23 @@ std::map<MetricName, std::pair<TableName, std::string>> metric_names = { {PREPARE, {CONSENSUS, "prepare"}}, {COMMIT, {CONSENSUS, "commit"}}, {EXECUTE, {CONSENSUS, "execute"}}, - {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}}}; + {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}}, + {CLIENT_LATENCY, {CLIENT, "client_latency"}}, + {SEQ_FAIL, {CONSENSUS, "seq_fail"}}, + {SEQ_GAP, {CONSENSUS, "seq_gap"}}, + {PENDING_EXECUTE, {CONSENSUS, "pending_execute"}}, + {EXECUTE_DONE, {CONSENSUS, "execute_done"}}, + {SEND_BROADCAST_MSG, {IO_THREAD, "send_broadcast_msg"}}, + {TOTAL_REQUEST, {CONSENSUS, "total_request"}}, + {PREPARE_PHASE_LATENCY, {CONSENSUS, "prepare_phase_latency"}}, + {COMMIT_PHASE_LATENCY, {CONSENSUS, "commit_phase_latency"}}, + {EXECUTION_DURATION, {CONSENSUS, "execution_duration"}}, + {CACHE_HIT_RATIO, {GENERAL, "cache_hit_ratio"}}, + {LEVELDB_MEM_SIZE, {GENERAL, "leveldb_mem_size"}}, + {SEND_BROADCAST_PER_REP, {IO_THREAD, "send_broadcast_per_rep"}}, + {GEO_REQUEST, {CLIENT, "geo_request"}}, + {TOTAL_GEO_REQUEST, {CLIENT, "total_geo_request"}}, + {VIEW_CHANGE, {CONSENSUS, "view_change"}}}; PrometheusHandler::PrometheusHandler(const std::string& server_address) { exposer_ = @@ -67,8 +83,17 @@ void PrometheusHandler::Register() { } for (auto& metric_pair : metric_names) { - RegisterMetric(table_names[metric_pair.second.first], - metric_pair.second.second); + // Register Summary metrics (latencies) + if (metric_pair.first == CLIENT_LATENCY || + metric_pair.first == PREPARE_PHASE_LATENCY || + metric_pair.first == COMMIT_PHASE_LATENCY || + metric_pair.first == EXECUTION_DURATION) { + RegisterSummaryMetric(table_names[metric_pair.second.first], + metric_pair.second.second); + } else { + RegisterMetric(table_names[metric_pair.second.first], + metric_pair.second.second); + } } } @@ -105,4 +130,35 @@ void PrometheusHandler::Inc(MetricName name, double value) { metric_[metric_name_str]->Increment(value); } +void PrometheusHandler::RegisterSummaryMetric(const std::string& table_name, + const std::string& metric_name) { + // Create summary family if it doesn't exist + if (summary_.find(table_name) == summary_.end()) { + sbuilder* summary_family = &prometheus::BuildSummary() + .Name(table_name + "_summary") + .Help(table_name + " summary metrics") + .Register(*registry_); + summary_[table_name] = summary_family; + } + + // Create summary metric + smetric* summary_metric = &summary_[table_name]->Add( + {{"metrics", metric_name}}, + prometheus::Summary::Quantiles{ + {0.5, 0.05}, // p50 with 5% error + {0.9, 0.01}, // p90 with 1% error + {0.95, 0.01}, // p95 with 1% error + {0.99, 0.001} // p99 with 0.1% error + }); + summary_metric_[metric_name] = summary_metric; +} + +void PrometheusHandler::Observe(MetricName name, double value) { + std::string metric_name_str = metric_names[name].second; + if (summary_metric_.find(metric_name_str) == summary_metric_.end()) { + return; + } + summary_metric_[metric_name_str]->Observe(value); +} + } // namespace resdb diff --git a/platform/statistic/prometheus_handler.h b/platform/statistic/prometheus_handler.h index b3a8b4d9..57cb18eb 100644 --- a/platform/statistic/prometheus_handler.h +++ b/platform/statistic/prometheus_handler.h @@ -23,6 +23,7 @@ #include <prometheus/counter.h> #include <prometheus/exposer.h> #include <prometheus/registry.h> +#include <prometheus/summary.h> namespace resdb { @@ -48,6 +49,23 @@ enum MetricName { COMMIT, EXECUTE, NUM_EXECUTE_TX, + CLIENT_LATENCY, + // New metrics + SEQ_FAIL, + SEQ_GAP, + PENDING_EXECUTE, + EXECUTE_DONE, + SEND_BROADCAST_MSG, + TOTAL_REQUEST, + PREPARE_PHASE_LATENCY, + COMMIT_PHASE_LATENCY, + EXECUTION_DURATION, + CACHE_HIT_RATIO, + LEVELDB_MEM_SIZE, + SEND_BROADCAST_PER_REP, + GEO_REQUEST, + TOTAL_GEO_REQUEST, + VIEW_CHANGE, }; class PrometheusHandler { @@ -57,6 +75,7 @@ class PrometheusHandler { void Set(MetricName name, double value); void Inc(MetricName name, double value); + void Observe(MetricName name, double value); protected: void Register(); @@ -67,6 +86,8 @@ class PrometheusHandler { private: typedef prometheus::Family<prometheus::Gauge> gbuilder; typedef prometheus::Gauge gmetric; + typedef prometheus::Family<prometheus::Summary> sbuilder; + typedef prometheus::Summary smetric; std::unique_ptr<prometheus::Exposer, std::default_delete<prometheus::Exposer>> exposer_; @@ -74,6 +95,11 @@ class PrometheusHandler { std::map<std::string, gbuilder*> gauge_; std::map<std::string, gmetric*> metric_; + std::map<std::string, sbuilder*> summary_; + std::map<std::string, smetric*> summary_metric_; + + void RegisterSummaryMetric(const std::string& table_name, + const std::string& metric_name); }; } // namespace resdb diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index dcda37dd..8fd1a57f 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -85,6 +85,9 @@ Stats::Stats(int sleep_time) { // Initialize static telemetry info static_telemetry_info_.port = -1; + + // Initialize previous_primary_id_ to detect view changes + previous_primary_id_ = -1; } void Stats::Stop() { stop_ = true; } @@ -280,6 +283,13 @@ bool Stats::IsFaulty() { return make_faulty_.load(); } void Stats::ChangePrimary(int primary_id) { transaction_summary_.primary_id = primary_id; make_faulty_.store(false); + + // Detect view change + if (prometheus_ && previous_primary_id_ != -1 && + previous_primary_id_ != primary_id) { + prometheus_->Inc(VIEW_CHANGE, 1); + } + previous_primary_id_ = primary_id; } void Stats::SetProps(int replica_id, std::string ip, int port, @@ -301,6 +311,12 @@ void Stats::SetProps(int replica_id, std::string ip, int port, void Stats::SetPrimaryId(int primary_id) { transaction_summary_.primary_id = primary_id; static_telemetry_info_.primary_id = primary_id; + + if (prometheus_ && previous_primary_id_ != -1 && + previous_primary_id_ != primary_id) { + prometheus_->Inc(VIEW_CHANGE, 1); + } + previous_primary_id_ = primary_id; } void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, @@ -312,6 +328,17 @@ void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio, static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio; static_telemetry_info_.level_db_stats_ = level_db_stats; static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size; + + if (prometheus_) { + prometheus_->Set(CACHE_HIT_RATIO, ext_cache_hit_ratio); + // Parse level_db_approx_mem_size as number (it's a string like "4104") + try { + double mem_size = std::stod(level_db_approx_mem_size); + prometheus_->Set(LEVELDB_MEM_SIZE, mem_size); + } catch (...) { + // Ignore parse errors + } + } } size_t Stats::GetShardIndex(uint64_t seq) const { @@ -364,7 +391,7 @@ void Stats::RecordStateTime(std::string state) { } void Stats::RecordStateTime(uint64_t seq, std::string state) { - if (!enable_resview) { + if (!enable_resview && !prometheus_) { return; } @@ -398,8 +425,24 @@ void Stats::RecordStateTime(uint64_t seq, std::string state) { telemetry.request_pre_prepare_state_time = now; } else if (state == "prepare") { telemetry.prepare_state_time = now; + + // Calculate prepare phase latency + if (prometheus_ && telemetry.request_pre_prepare_state_time != + std::chrono::system_clock::time_point::min()) { + auto duration = std::chrono::duration_cast<std::chrono::microseconds>( + now - telemetry.request_pre_prepare_state_time).count(); + prometheus_->Observe(PREPARE_PHASE_LATENCY, duration / 1e6); // Convert to seconds + } } else if (state == "commit") { telemetry.commit_state_time = now; + + // Calculate commit phase latency + if (prometheus_ && telemetry.prepare_state_time != + std::chrono::system_clock::time_point::min()) { + auto duration = std::chrono::duration_cast<std::chrono::microseconds>( + now - telemetry.prepare_state_time).count(); + prometheus_->Observe(COMMIT_PHASE_LATENCY, duration / 1e6); // Convert to seconds + } } } @@ -697,6 +740,14 @@ void Stats::SendSummary(uint64_t seq) { // CRITICAL: Set execution_time before serialization, but don't modify any // other fields telemetry.execution_time = std::chrono::system_clock::now(); + + // Calculate execution duration + if (prometheus_ && telemetry.commit_state_time != + std::chrono::system_clock::time_point::min()) { + auto duration = std::chrono::duration_cast<std::chrono::microseconds>( + telemetry.execution_time - telemetry.commit_state_time).count(); + prometheus_->Observe(EXECUTION_DURATION, duration / 1e6); // Convert to seconds + } // Use automatic JSON serialization - this reads current state, doesn't modify // it @@ -847,7 +898,7 @@ void Stats::MonitorGlobal() { LOG(ERROR) << " req client latency:" << static_cast<double>(run_req_run_time - last_run_req_run_time) / - (run_req_num - last_run_req_num) / 1000000000.0; + (run_req_num - last_run_req_num) / 1000000.0; } last_seq_fail = seq_fail; @@ -987,13 +1038,19 @@ void Stats::IncCommit(uint64_t seq) { } } -void Stats::IncPendingExecute() { pending_execute_++; } +void Stats::IncPendingExecute() { + pending_execute_++; + if (prometheus_) { + prometheus_->Set(PENDING_EXECUTE, static_cast<double>(pending_execute_.load())); + } +} void Stats::IncExecute() { execute_++; } void Stats::IncExecuteDone() { if (prometheus_) { prometheus_->Inc(EXECUTE, 1); + prometheus_->Inc(EXECUTE_DONE, 1); } execute_done_++; } @@ -1005,22 +1062,48 @@ void Stats::BroadCastMsg() { broad_cast_msg_++; } -void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; } +void Stats::SendBroadCastMsg(uint32_t num) { + send_broad_cast_msg_ += num; + if (prometheus_) { + prometheus_->Inc(SEND_BROADCAST_MSG, static_cast<double>(num)); + } +} -void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; } +void Stats::SendBroadCastMsgPerRep() { + send_broad_cast_msg_per_rep_++; + if (prometheus_) { + prometheus_->Inc(SEND_BROADCAST_PER_REP, 1); + } +} -void Stats::SeqFail() { seq_fail_++; } +void Stats::SeqFail() { + seq_fail_++; + if (prometheus_) { + prometheus_->Inc(SEQ_FAIL, 1); + } +} void Stats::IncTotalRequest(uint32_t num) { if (prometheus_) { prometheus_->Inc(NUM_EXECUTE_TX, num); + prometheus_->Inc(TOTAL_REQUEST, static_cast<double>(num)); } total_request_ += num; } -void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; } +void Stats::IncTotalGeoRequest(uint32_t num) { + total_geo_request_ += num; + if (prometheus_) { + prometheus_->Inc(TOTAL_GEO_REQUEST, static_cast<double>(num)); + } +} -void Stats::IncGeoRequest() { geo_request_++; } +void Stats::IncGeoRequest() { + geo_request_++; + if (prometheus_) { + prometheus_->Inc(GEO_REQUEST, 1); + } +} void Stats::ServerCall() { if (prometheus_) { @@ -1036,11 +1119,19 @@ void Stats::ServerProcess() { server_process_++; } -void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; } +void Stats::SeqGap(uint64_t seq_gap) { + seq_gap_ = seq_gap; + if (prometheus_) { + prometheus_->Set(SEQ_GAP, static_cast<double>(seq_gap)); + } +} void Stats::AddLatency(uint64_t run_time) { run_req_num_++; run_req_run_time_ += run_time; + if (prometheus_) { + prometheus_->Observe(CLIENT_LATENCY, run_time / 1e6); + } } void Stats::SetPrometheus(const std::string& prometheus_address) { diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 3f711ef9..0f29d8aa 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -289,6 +289,8 @@ class Stats { nlohmann::json consensus_history_; std::mutex consensus_history_mutex_; // Protect consensus_history_ access + int previous_primary_id_ = -1; // Track for view change detection + std::unique_ptr<PrometheusHandler> prometheus_; };
