This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new 6b0165ce added more prometheus metrics
6b0165ce is described below
commit 6b0165ce6286d2470f3d1445141add95a1ac8e8a
Author: bchou9 <[email protected]>
AuthorDate: Thu Jan 8 07:29:47 2026 +0000
added more prometheus metrics
---
platform/statistic/prometheus_handler.cpp | 62 ++++++++++++++++-
platform/statistic/prometheus_handler.h | 26 +++++++
platform/statistic/stats.cpp | 109 +++++++++++++++++++++++++++---
platform/statistic/stats.h | 2 +
4 files changed, 187 insertions(+), 12 deletions(-)
diff --git a/platform/statistic/prometheus_handler.cpp
b/platform/statistic/prometheus_handler.cpp
index 644f6a18..fbe5886f 100644
--- a/platform/statistic/prometheus_handler.cpp
+++ b/platform/statistic/prometheus_handler.cpp
@@ -43,7 +43,23 @@ std::map<MetricName, std::pair<TableName, std::string>>
metric_names = {
{PREPARE, {CONSENSUS, "prepare"}},
{COMMIT, {CONSENSUS, "commit"}},
{EXECUTE, {CONSENSUS, "execute"}},
- {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}}};
+ {NUM_EXECUTE_TX, {CONSENSUS, "num_execute_tx"}},
+ {CLIENT_LATENCY, {CLIENT, "client_latency"}},
+ {SEQ_FAIL, {CONSENSUS, "seq_fail"}},
+ {SEQ_GAP, {CONSENSUS, "seq_gap"}},
+ {PENDING_EXECUTE, {CONSENSUS, "pending_execute"}},
+ {EXECUTE_DONE, {CONSENSUS, "execute_done"}},
+ {SEND_BROADCAST_MSG, {IO_THREAD, "send_broadcast_msg"}},
+ {TOTAL_REQUEST, {CONSENSUS, "total_request"}},
+ {PREPARE_PHASE_LATENCY, {CONSENSUS, "prepare_phase_latency"}},
+ {COMMIT_PHASE_LATENCY, {CONSENSUS, "commit_phase_latency"}},
+ {EXECUTION_DURATION, {CONSENSUS, "execution_duration"}},
+ {CACHE_HIT_RATIO, {GENERAL, "cache_hit_ratio"}},
+ {LEVELDB_MEM_SIZE, {GENERAL, "leveldb_mem_size"}},
+ {SEND_BROADCAST_PER_REP, {IO_THREAD, "send_broadcast_per_rep"}},
+ {GEO_REQUEST, {CLIENT, "geo_request"}},
+ {TOTAL_GEO_REQUEST, {CLIENT, "total_geo_request"}},
+ {VIEW_CHANGE, {CONSENSUS, "view_change"}}};
PrometheusHandler::PrometheusHandler(const std::string& server_address) {
exposer_ =
@@ -67,8 +83,17 @@ void PrometheusHandler::Register() {
}
for (auto& metric_pair : metric_names) {
- RegisterMetric(table_names[metric_pair.second.first],
- metric_pair.second.second);
+ // Register Summary metrics (latencies)
+ if (metric_pair.first == CLIENT_LATENCY ||
+ metric_pair.first == PREPARE_PHASE_LATENCY ||
+ metric_pair.first == COMMIT_PHASE_LATENCY ||
+ metric_pair.first == EXECUTION_DURATION) {
+ RegisterSummaryMetric(table_names[metric_pair.second.first],
+ metric_pair.second.second);
+ } else {
+ RegisterMetric(table_names[metric_pair.second.first],
+ metric_pair.second.second);
+ }
}
}
@@ -105,4 +130,35 @@ void PrometheusHandler::Inc(MetricName name, double value)
{
metric_[metric_name_str]->Increment(value);
}
+void PrometheusHandler::RegisterSummaryMetric(const std::string& table_name,
+ const std::string& metric_name)
{
+ // Create summary family if it doesn't exist
+ if (summary_.find(table_name) == summary_.end()) {
+ sbuilder* summary_family = &prometheus::BuildSummary()
+ .Name(table_name + "_summary")
+ .Help(table_name + " summary metrics")
+ .Register(*registry_);
+ summary_[table_name] = summary_family;
+ }
+
+ // Create summary metric
+ smetric* summary_metric = &summary_[table_name]->Add(
+ {{"metrics", metric_name}},
+ prometheus::Summary::Quantiles{
+ {0.5, 0.05}, // p50 with 5% error
+ {0.9, 0.01}, // p90 with 1% error
+ {0.95, 0.01}, // p95 with 1% error
+ {0.99, 0.001} // p99 with 0.1% error
+ });
+ summary_metric_[metric_name] = summary_metric;
+}
+
+void PrometheusHandler::Observe(MetricName name, double value) {
+ std::string metric_name_str = metric_names[name].second;
+ if (summary_metric_.find(metric_name_str) == summary_metric_.end()) {
+ return;
+ }
+ summary_metric_[metric_name_str]->Observe(value);
+}
+
} // namespace resdb
diff --git a/platform/statistic/prometheus_handler.h
b/platform/statistic/prometheus_handler.h
index b3a8b4d9..57cb18eb 100644
--- a/platform/statistic/prometheus_handler.h
+++ b/platform/statistic/prometheus_handler.h
@@ -23,6 +23,7 @@
#include <prometheus/counter.h>
#include <prometheus/exposer.h>
#include <prometheus/registry.h>
+#include <prometheus/summary.h>
namespace resdb {
@@ -48,6 +49,23 @@ enum MetricName {
COMMIT,
EXECUTE,
NUM_EXECUTE_TX,
+ CLIENT_LATENCY,
+ // New metrics
+ SEQ_FAIL,
+ SEQ_GAP,
+ PENDING_EXECUTE,
+ EXECUTE_DONE,
+ SEND_BROADCAST_MSG,
+ TOTAL_REQUEST,
+ PREPARE_PHASE_LATENCY,
+ COMMIT_PHASE_LATENCY,
+ EXECUTION_DURATION,
+ CACHE_HIT_RATIO,
+ LEVELDB_MEM_SIZE,
+ SEND_BROADCAST_PER_REP,
+ GEO_REQUEST,
+ TOTAL_GEO_REQUEST,
+ VIEW_CHANGE,
};
class PrometheusHandler {
@@ -57,6 +75,7 @@ class PrometheusHandler {
void Set(MetricName name, double value);
void Inc(MetricName name, double value);
+ void Observe(MetricName name, double value);
protected:
void Register();
@@ -67,6 +86,8 @@ class PrometheusHandler {
private:
typedef prometheus::Family<prometheus::Gauge> gbuilder;
typedef prometheus::Gauge gmetric;
+ typedef prometheus::Family<prometheus::Summary> sbuilder;
+ typedef prometheus::Summary smetric;
std::unique_ptr<prometheus::Exposer,
std::default_delete<prometheus::Exposer>>
exposer_;
@@ -74,6 +95,11 @@ class PrometheusHandler {
std::map<std::string, gbuilder*> gauge_;
std::map<std::string, gmetric*> metric_;
+ std::map<std::string, sbuilder*> summary_;
+ std::map<std::string, smetric*> summary_metric_;
+
+ void RegisterSummaryMetric(const std::string& table_name,
+ const std::string& metric_name);
};
} // namespace resdb
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index dcda37dd..8fd1a57f 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -85,6 +85,9 @@ Stats::Stats(int sleep_time) {
// Initialize static telemetry info
static_telemetry_info_.port = -1;
+
+ // Initialize previous_primary_id_ to detect view changes
+ previous_primary_id_ = -1;
}
void Stats::Stop() { stop_ = true; }
@@ -280,6 +283,13 @@ bool Stats::IsFaulty() { return make_faulty_.load(); }
void Stats::ChangePrimary(int primary_id) {
transaction_summary_.primary_id = primary_id;
make_faulty_.store(false);
+
+ // Detect view change
+ if (prometheus_ && previous_primary_id_ != -1 &&
+ previous_primary_id_ != primary_id) {
+ prometheus_->Inc(VIEW_CHANGE, 1);
+ }
+ previous_primary_id_ = primary_id;
}
void Stats::SetProps(int replica_id, std::string ip, int port,
@@ -301,6 +311,12 @@ void Stats::SetProps(int replica_id, std::string ip, int
port,
void Stats::SetPrimaryId(int primary_id) {
transaction_summary_.primary_id = primary_id;
static_telemetry_info_.primary_id = primary_id;
+
+ if (prometheus_ && previous_primary_id_ != -1 &&
+ previous_primary_id_ != primary_id) {
+ prometheus_->Inc(VIEW_CHANGE, 1);
+ }
+ previous_primary_id_ = primary_id;
}
void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
@@ -312,6 +328,17 @@ void Stats::SetStorageEngineMetrics(double
ext_cache_hit_ratio,
static_telemetry_info_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
static_telemetry_info_.level_db_stats_ = level_db_stats;
static_telemetry_info_.level_db_approx_mem_size_ = level_db_approx_mem_size;
+
+ if (prometheus_) {
+ prometheus_->Set(CACHE_HIT_RATIO, ext_cache_hit_ratio);
+ // Parse level_db_approx_mem_size as number (it's a string like "4104")
+ try {
+ double mem_size = std::stod(level_db_approx_mem_size);
+ prometheus_->Set(LEVELDB_MEM_SIZE, mem_size);
+ } catch (...) {
+ // Ignore parse errors
+ }
+ }
}
size_t Stats::GetShardIndex(uint64_t seq) const {
@@ -364,7 +391,7 @@ void Stats::RecordStateTime(std::string state) {
}
void Stats::RecordStateTime(uint64_t seq, std::string state) {
- if (!enable_resview) {
+ if (!enable_resview && !prometheus_) {
return;
}
@@ -398,8 +425,24 @@ void Stats::RecordStateTime(uint64_t seq, std::string
state) {
telemetry.request_pre_prepare_state_time = now;
} else if (state == "prepare") {
telemetry.prepare_state_time = now;
+
+ // Calculate prepare phase latency
+ if (prometheus_ && telemetry.request_pre_prepare_state_time !=
+ std::chrono::system_clock::time_point::min()) {
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ now - telemetry.request_pre_prepare_state_time).count();
+ prometheus_->Observe(PREPARE_PHASE_LATENCY, duration / 1e6); // Convert
to seconds
+ }
} else if (state == "commit") {
telemetry.commit_state_time = now;
+
+ // Calculate commit phase latency
+ if (prometheus_ && telemetry.prepare_state_time !=
+ std::chrono::system_clock::time_point::min()) {
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ now - telemetry.prepare_state_time).count();
+ prometheus_->Observe(COMMIT_PHASE_LATENCY, duration / 1e6); // Convert
to seconds
+ }
}
}
@@ -697,6 +740,14 @@ void Stats::SendSummary(uint64_t seq) {
// CRITICAL: Set execution_time before serialization, but don't modify any
// other fields
telemetry.execution_time = std::chrono::system_clock::now();
+
+ // Calculate execution duration
+ if (prometheus_ && telemetry.commit_state_time !=
+ std::chrono::system_clock::time_point::min()) {
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ telemetry.execution_time - telemetry.commit_state_time).count();
+ prometheus_->Observe(EXECUTION_DURATION, duration / 1e6); // Convert to
seconds
+ }
// Use automatic JSON serialization - this reads current state, doesn't
modify
// it
@@ -847,7 +898,7 @@ void Stats::MonitorGlobal() {
LOG(ERROR) << " req client latency:"
<< static_cast<double>(run_req_run_time -
last_run_req_run_time) /
- (run_req_num - last_run_req_num) / 1000000000.0;
+ (run_req_num - last_run_req_num) / 1000000.0;
}
last_seq_fail = seq_fail;
@@ -987,13 +1038,19 @@ void Stats::IncCommit(uint64_t seq) {
}
}
-void Stats::IncPendingExecute() { pending_execute_++; }
+void Stats::IncPendingExecute() {
+ pending_execute_++;
+ if (prometheus_) {
+ prometheus_->Set(PENDING_EXECUTE,
static_cast<double>(pending_execute_.load()));
+ }
+}
void Stats::IncExecute() { execute_++; }
void Stats::IncExecuteDone() {
if (prometheus_) {
prometheus_->Inc(EXECUTE, 1);
+ prometheus_->Inc(EXECUTE_DONE, 1);
}
execute_done_++;
}
@@ -1005,22 +1062,48 @@ void Stats::BroadCastMsg() {
broad_cast_msg_++;
}
-void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
+void Stats::SendBroadCastMsg(uint32_t num) {
+ send_broad_cast_msg_ += num;
+ if (prometheus_) {
+ prometheus_->Inc(SEND_BROADCAST_MSG, static_cast<double>(num));
+ }
+}
-void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
+void Stats::SendBroadCastMsgPerRep() {
+ send_broad_cast_msg_per_rep_++;
+ if (prometheus_) {
+ prometheus_->Inc(SEND_BROADCAST_PER_REP, 1);
+ }
+}
-void Stats::SeqFail() { seq_fail_++; }
+void Stats::SeqFail() {
+ seq_fail_++;
+ if (prometheus_) {
+ prometheus_->Inc(SEQ_FAIL, 1);
+ }
+}
void Stats::IncTotalRequest(uint32_t num) {
if (prometheus_) {
prometheus_->Inc(NUM_EXECUTE_TX, num);
+ prometheus_->Inc(TOTAL_REQUEST, static_cast<double>(num));
}
total_request_ += num;
}
-void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
+void Stats::IncTotalGeoRequest(uint32_t num) {
+ total_geo_request_ += num;
+ if (prometheus_) {
+ prometheus_->Inc(TOTAL_GEO_REQUEST, static_cast<double>(num));
+ }
+}
-void Stats::IncGeoRequest() { geo_request_++; }
+void Stats::IncGeoRequest() {
+ geo_request_++;
+ if (prometheus_) {
+ prometheus_->Inc(GEO_REQUEST, 1);
+ }
+}
void Stats::ServerCall() {
if (prometheus_) {
@@ -1036,11 +1119,19 @@ void Stats::ServerProcess() {
server_process_++;
}
-void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
+void Stats::SeqGap(uint64_t seq_gap) {
+ seq_gap_ = seq_gap;
+ if (prometheus_) {
+ prometheus_->Set(SEQ_GAP, static_cast<double>(seq_gap));
+ }
+}
void Stats::AddLatency(uint64_t run_time) {
run_req_num_++;
run_req_run_time_ += run_time;
+ if (prometheus_) {
+ prometheus_->Observe(CLIENT_LATENCY, run_time / 1e6);
+ }
}
void Stats::SetPrometheus(const std::string& prometheus_address) {
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 3f711ef9..0f29d8aa 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -289,6 +289,8 @@ class Stats {
nlohmann::json consensus_history_;
std::mutex consensus_history_mutex_; // Protect consensus_history_ access
+ int previous_primary_id_ = -1; // Track for view change detection
+
std::unique_ptr<PrometheusHandler> prometheus_;
};