This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 789fa5095c5 Add pulsar_subscription_consumers_count metric (#15042)
789fa5095c5 is described below
commit 789fa5095c5182dbeb9874d185032549a50bc2b6
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Apr 19 15:28:28 2022 +0200
Add pulsar_subscription_consumers_count metric (#15042)
Fixes #15032
### Modifications
Added metric `pulsar_subscription_consumers_count`
---
.../pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java | 2 ++
.../apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 1 +
.../main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java | 2 ++
.../test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 1 +
4 files changed, 6 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index c829be28e59..b8e7c361339 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -60,5 +60,7 @@ public class AggregatedSubscriptionStats {
double msgDropRate;
+ long consumersCount;
+
public Map<Consumer, AggregatedConsumerStats> consumerStat = new
HashMap<>();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a3a0fcda445..8c9f8e32715 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -126,6 +126,7 @@ public class NamespaceStatsAggregator {
subsStats.lastConsumedFlowTimestamp =
subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp =
subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp =
subscriptionStats.lastMarkDeleteAdvancedTimestamp;
+ subsStats.consumersCount = subscriptionStats.consumers.size();
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 35dbdba274a..b45a09f06f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -268,6 +268,8 @@ class TopicStats {
subsStats.totalMsgExpired,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_consumers_count",
+ subsStats.consumersCount,
splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
"pulsar_consumer_msg_rate_redeliver",
consumerStats.msgRateRedeliver,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index b62a7529fe3..2a6754bfb1c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -527,6 +527,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertTrue(metrics.containsKey("pulsar_out_messages_total"));
assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
+ assertTrue(metrics.containsKey("pulsar_subscription_consumers_count"));
}
@Test