This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/development by this push:
     new 6b0165ce added more prometheus metrics
6b0165ce is described below

commit 6b0165ce6286d2470f3d1445141add95a1ac8e8a
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 07:29:47 2026 +0000

    added more prometheus metrics
---
 platform/statistic/prometheus_handler.cpp |  62 ++++++++++++++++-
 platform/statistic/prometheus_handler.h   |  26 +++++++
 platform/statistic/stats.cpp              | 109 +++++++++++++++++++++++++++---
 platform/statistic/stats.h                |   2 +
 4 files changed, 187 insertions(+), 12 deletions(-)

diff --git a/platform/statistic/prometheus_handler.cpp 
b/platform/statistic/prometheus_handler.cpp
index 644f6a18..fbe5886f 100644
--- a/platform/statistic/prometheus_handler.cpp
+++ b/platform/statistic/prometheus_handler.cpp
@@ -43,7 +43,23 @@ std::map<MetricName, std::pair<TableName, std::string>> 
metric_names = {
     {PREPARE, {CONSENSUS, "prepare"}},
     {COMMIT, {CONSENSUS, "commit"}},
     {EXECUTE, {CONSENSUS, "execute"}},
-    {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}}};
+    {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}},
+    {CLIENT_LATENCY, {CLIENT, "client_latency"}},
+    {SEQ_FAIL, {CONSENSUS, "seq_fail"}},
+    {SEQ_GAP, {CONSENSUS, "seq_gap"}},
+    {PENDING_EXECUTE, {CONSENSUS, "pending_execute"}},
+    {EXECUTE_DONE, {CONSENSUS, "execute_done"}},
+    {SEND_BROADCAST_MSG, {IO_THREAD, "send_broadcast_msg"}},
+    {TOTAL_REQUEST, {CONSENSUS, "total_request"}},
+    {PREPARE_PHASE_LATENCY, {CONSENSUS, "prepare_phase_latency"}},
+    {COMMIT_PHASE_LATENCY, {CONSENSUS, "commit_phase_latency"}},
+    {EXECUTION_DURATION, {CONSENSUS, "execution_duration"}},
+    {CACHE_HIT_RATIO, {GENERAL, "cache_hit_ratio"}},
+    {LEVELDB_MEM_SIZE, {GENERAL, "leveldb_mem_size"}},
+    {SEND_BROADCAST_PER_REP, {IO_THREAD, "send_broadcast_per_rep"}},
+    {GEO_REQUEST, {CLIENT, "geo_request"}},
+    {TOTAL_GEO_REQUEST, {CLIENT, "total_geo_request"}},
+    {VIEW_CHANGE, {CONSENSUS, "view_change"}}};
 
 PrometheusHandler::PrometheusHandler(const std::string& server_address) {
   exposer_ =
@@ -67,8 +83,17 @@ void PrometheusHandler::Register() {
   }
 
   for (auto& metric_pair : metric_names) {
-    RegisterMetric(table_names[metric_pair.second.first],
-                   metric_pair.second.second);
+    // Register Summary metrics (latencies)
+    if (metric_pair.first == CLIENT_LATENCY ||
+        metric_pair.first == PREPARE_PHASE_LATENCY ||
+        metric_pair.first == COMMIT_PHASE_LATENCY ||
+        metric_pair.first == EXECUTION_DURATION) {
+      RegisterSummaryMetric(table_names[metric_pair.second.first],
+                            metric_pair.second.second);
+    } else {
+      RegisterMetric(table_names[metric_pair.second.first],
+                     metric_pair.second.second);
+    }
   }
 }
 
@@ -105,4 +130,35 @@ void PrometheusHandler::Inc(MetricName name, double value) 
{
   metric_[metric_name_str]->Increment(value);
 }
 
+void PrometheusHandler::RegisterSummaryMetric(const std::string& table_name,
+                                               const std::string& metric_name) 
{
+  // Create summary family if it doesn't exist
+  if (summary_.find(table_name) == summary_.end()) {
+    sbuilder* summary_family = &prometheus::BuildSummary()
+                                   .Name(table_name + "_summary")
+                                   .Help(table_name + " summary metrics")
+                                   .Register(*registry_);
+    summary_[table_name] = summary_family;
+  }
+  
+  // Create summary metric
+  smetric* summary_metric = &summary_[table_name]->Add(
+      {{"metrics", metric_name}},
+      prometheus::Summary::Quantiles{
+          {0.5, 0.05},   // p50 with 5% error
+          {0.9, 0.01},   // p90 with 1% error
+          {0.95, 0.01},  // p95 with 1% error
+          {0.99, 0.001}  // p99 with 0.1% error
+      });
+  summary_metric_[metric_name] = summary_metric;
+}
+
+void PrometheusHandler::Observe(MetricName name, double value) {
+  std::string metric_name_str = metric_names[name].second;
+  if (summary_metric_.find(metric_name_str) == summary_metric_.end()) {
+    return;
+  }
+  summary_metric_[metric_name_str]->Observe(value);
+}
+
 }  // namespace resdb
diff --git a/platform/statistic/prometheus_handler.h 
b/platform/statistic/prometheus_handler.h
index b3a8b4d9..57cb18eb 100644
--- a/platform/statistic/prometheus_handler.h
+++ b/platform/statistic/prometheus_handler.h
@@ -23,6 +23,7 @@
 #include <prometheus/counter.h>
 #include <prometheus/exposer.h>
 #include <prometheus/registry.h>
+#include <prometheus/summary.h>
 
 namespace resdb {
 
@@ -48,6 +49,23 @@ enum MetricName {
   COMMIT,
   EXECUTE,
   NUM_EXECUTE_TX,
+  CLIENT_LATENCY,
+  // New metrics
+  SEQ_FAIL,
+  SEQ_GAP,
+  PENDING_EXECUTE,
+  EXECUTE_DONE,
+  SEND_BROADCAST_MSG,
+  TOTAL_REQUEST,
+  PREPARE_PHASE_LATENCY,
+  COMMIT_PHASE_LATENCY,
+  EXECUTION_DURATION,
+  CACHE_HIT_RATIO,
+  LEVELDB_MEM_SIZE,
+  SEND_BROADCAST_PER_REP,
+  GEO_REQUEST,
+  TOTAL_GEO_REQUEST,
+  VIEW_CHANGE,
 };
 
 class PrometheusHandler {
@@ -57,6 +75,7 @@ class PrometheusHandler {
 
   void Set(MetricName name, double value);
   void Inc(MetricName name, double value);
+  void Observe(MetricName name, double value);
 
  protected:
   void Register();
@@ -67,6 +86,8 @@ class PrometheusHandler {
  private:
   typedef prometheus::Family<prometheus::Gauge> gbuilder;
   typedef prometheus::Gauge gmetric;
+  typedef prometheus::Family<prometheus::Summary> sbuilder;
+  typedef prometheus::Summary smetric;
 
   std::unique_ptr<prometheus::Exposer, 
std::default_delete<prometheus::Exposer>>
       exposer_;
@@ -74,6 +95,11 @@ class PrometheusHandler {
 
   std::map<std::string, gbuilder*> gauge_;
   std::map<std::string, gmetric*> metric_;
+  std::map<std::string, sbuilder*> summary_;
+  std::map<std::string, smetric*> summary_metric_;
+  
+  void RegisterSummaryMetric(const std::string& table_name,
+                              const std::string& metric_name);
 };
 
 }  // namespace resdb
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index dcda37dd..8fd1a57f 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -85,6 +85,9 @@ Stats::Stats(int sleep_time) {
 
   // Initialize static telemetry info
   static_telemetry_info_.port = -1;
+  
+  // Initialize previous_primary_id_ to detect view changes
+  previous_primary_id_ = -1;
 }
 
 void Stats::Stop() { stop_ = true; }
@@ -280,6 +283,13 @@ bool Stats::IsFaulty() { return make_faulty_.load(); }
 void Stats::ChangePrimary(int primary_id) {
   transaction_summary_.primary_id = primary_id;
   make_faulty_.store(false);
+  
+  // Detect view change
+  if (prometheus_ && previous_primary_id_ != -1 && 
+      previous_primary_id_ != primary_id) {
+    prometheus_->Inc(VIEW_CHANGE, 1);
+  }
+  previous_primary_id_ = primary_id;
 }
 
 void Stats::SetProps(int replica_id, std::string ip, int port,
@@ -301,6 +311,12 @@ void Stats::SetProps(int replica_id, std::string ip, int 
port,
 void Stats::SetPrimaryId(int primary_id) {
   transaction_summary_.primary_id = primary_id;
   static_telemetry_info_.primary_id = primary_id;
+  
+  if (prometheus_ && previous_primary_id_ != -1 && 
+      previous_primary_id_ != primary_id) {
+    prometheus_->Inc(VIEW_CHANGE, 1);
+  }
+  previous_primary_id_ = primary_id;
 }
 
 void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
@@ -312,6 +328,17 @@ void Stats::SetStorageEngineMetrics(double 
ext_cache_hit_ratio,
   static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
   static_telemetry_info_.level_db_stats_ = level_db_stats;
   static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+  
+  if (prometheus_) {
+    prometheus_->Set(CACHE_HIT_RATIO, ext_cache_hit_ratio);
+    // Parse level_db_approx_mem_size as number (it's a string like "4104")
+    try {
+      double mem_size = std::stod(level_db_approx_mem_size);
+      prometheus_->Set(LEVELDB_MEM_SIZE, mem_size);
+    } catch (...) {
+      // Ignore parse errors
+    }
+  }
 }
 
 size_t Stats::GetShardIndex(uint64_t seq) const {
@@ -364,7 +391,7 @@ void Stats::RecordStateTime(std::string state) {
 }
 
 void Stats::RecordStateTime(uint64_t seq, std::string state) {
-  if (!enable_resview) {
+  if (!enable_resview && !prometheus_) {
     return;
   }
 
@@ -398,8 +425,24 @@ void Stats::RecordStateTime(uint64_t seq, std::string 
state) {
     telemetry.request_pre_prepare_state_time = now;
   } else if (state == "prepare") {
     telemetry.prepare_state_time = now;
+    
+    // Calculate prepare phase latency
+    if (prometheus_ && telemetry.request_pre_prepare_state_time != 
+        std::chrono::system_clock::time_point::min()) {
+      auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+          now - telemetry.request_pre_prepare_state_time).count();
+      prometheus_->Observe(PREPARE_PHASE_LATENCY, duration / 1e6); // Convert 
to seconds
+    }
   } else if (state == "commit") {
     telemetry.commit_state_time = now;
+    
+    // Calculate commit phase latency
+    if (prometheus_ && telemetry.prepare_state_time != 
+        std::chrono::system_clock::time_point::min()) {
+      auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+          now - telemetry.prepare_state_time).count();
+      prometheus_->Observe(COMMIT_PHASE_LATENCY, duration / 1e6); // Convert 
to seconds
+    }
   }
 }
 
@@ -697,6 +740,14 @@ void Stats::SendSummary(uint64_t seq) {
   // CRITICAL: Set execution_time before serialization, but don't modify any
   // other fields
   telemetry.execution_time = std::chrono::system_clock::now();
+  
+  // Calculate execution duration
+  if (prometheus_ && telemetry.commit_state_time != 
+      std::chrono::system_clock::time_point::min()) {
+    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+        telemetry.execution_time - telemetry.commit_state_time).count();
+    prometheus_->Observe(EXECUTION_DURATION, duration / 1e6); // Convert to 
seconds
+  }
 
   // Use automatic JSON serialization - this reads current state, doesn't 
modify
   // it
@@ -847,7 +898,7 @@ void Stats::MonitorGlobal() {
       LOG(ERROR) << "  req client latency:"
                  << static_cast<double>(run_req_run_time -
                                         last_run_req_run_time) /
-                        (run_req_num - last_run_req_num) / 1000000000.0;
+                        (run_req_num - last_run_req_num) / 1000000.0;
     }
 
     last_seq_fail = seq_fail;
@@ -987,13 +1038,19 @@ void Stats::IncCommit(uint64_t seq) {
   }
 }
 
-void Stats::IncPendingExecute() { pending_execute_++; }
+void Stats::IncPendingExecute() { 
+  pending_execute_++; 
+  if (prometheus_) {
+    prometheus_->Set(PENDING_EXECUTE, 
static_cast<double>(pending_execute_.load()));
+  }
+}
 
 void Stats::IncExecute() { execute_++; }
 
 void Stats::IncExecuteDone() {
   if (prometheus_) {
     prometheus_->Inc(EXECUTE, 1);
+    prometheus_->Inc(EXECUTE_DONE, 1);
   }
   execute_done_++;
 }
@@ -1005,22 +1062,48 @@ void Stats::BroadCastMsg() {
   broad_cast_msg_++;
 }
 
-void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+void Stats::SendBroadCastMsg(uint32_t num) { 
+  send_broad_cast_msg_ += num; 
+  if (prometheus_) {
+    prometheus_->Inc(SEND_BROADCAST_MSG, static_cast<double>(num));
+  }
+}
 
-void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+void Stats::SendBroadCastMsgPerRep() { 
+  send_broad_cast_msg_per_rep_++; 
+  if (prometheus_) {
+    prometheus_->Inc(SEND_BROADCAST_PER_REP, 1);
+  }
+}
 
-void Stats::SeqFail() { seq_fail_++; }
+void Stats::SeqFail() { 
+  seq_fail_++; 
+  if (prometheus_) {
+    prometheus_->Inc(SEQ_FAIL, 1);
+  }
+}
 
 void Stats::IncTotalRequest(uint32_t num) {
   if (prometheus_) {
     prometheus_->Inc(NUM_EXECUTE_TX, num);
+    prometheus_->Inc(TOTAL_REQUEST, static_cast<double>(num));
   }
   total_request_ += num;
 }
 
-void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+void Stats::IncTotalGeoRequest(uint32_t num) { 
+  total_geo_request_ += num; 
+  if (prometheus_) {
+    prometheus_->Inc(TOTAL_GEO_REQUEST, static_cast<double>(num));
+  }
+}
 
-void Stats::IncGeoRequest() { geo_request_++; }
+void Stats::IncGeoRequest() { 
+  geo_request_++; 
+  if (prometheus_) {
+    prometheus_->Inc(GEO_REQUEST, 1);
+  }
+}
 
 void Stats::ServerCall() {
   if (prometheus_) {
@@ -1036,11 +1119,19 @@ void Stats::ServerProcess() {
   server_process_++;
 }
 
-void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+void Stats::SeqGap(uint64_t seq_gap) { 
+  seq_gap_ = seq_gap; 
+  if (prometheus_) {
+    prometheus_->Set(SEQ_GAP, static_cast<double>(seq_gap));
+  }
+}
 
 void Stats::AddLatency(uint64_t run_time) {
   run_req_num_++;
   run_req_run_time_ += run_time;
+  if (prometheus_) {
+    prometheus_->Observe(CLIENT_LATENCY, run_time / 1e6);
+  }
 }
 
 void Stats::SetPrometheus(const std::string& prometheus_address) {
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 3f711ef9..0f29d8aa 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -289,6 +289,8 @@ class Stats {
   nlohmann::json consensus_history_;
   std::mutex consensus_history_mutex_;  // Protect consensus_history_ access
 
+  int previous_primary_id_ = -1;  // Track for view change detection
+
   std::unique_ptr<PrometheusHandler> prometheus_;
 };
 

Reply via email to