This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 9339fd4d98a [broker][monitoring][fix] fix
`pulsar_subscription_msg_ack_rate` (#16866)
9339fd4d98a is described below
commit 9339fd4d98a4af971924d7406df8301e8c9c31b0
Author: Tao Jiuming <[email protected]>
AuthorDate: Thu Aug 4 12:23:28 2022 +0800
[broker][monitoring][fix] fix `pulsar_subscription_msg_ack_rate` (#16866)
---
.../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 1 +
.../java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java | 9 +++++++++
2 files changed, 10 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 0212c3e964a..e70f092af80 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -196,6 +196,7 @@ public class NamespaceStatsAggregator {
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
+ subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index af19e889e9b..24220625f91 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -300,6 +300,8 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
Multimap<String, PrometheusMetricsTest.Metric> metricsMap =
PrometheusMetricsTest.parseMetrics(metricStr);
Collection<PrometheusMetricsTest.Metric> ackRateMetric =
metricsMap.get("pulsar_consumer_msg_ack_rate");
+ Collection<PrometheusMetricsTest.Metric> subAckRateMetrics =
metricsMap.get("pulsar_subscription_msg_ack_rate");
+
String rateOutMetricName = exposeTopicLevelMetrics ?
"pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
Collection<PrometheusMetricsTest.Metric> rateOutMetric =
metricsMap.get(rateOutMetricName);
Assert.assertTrue(ackRateMetric.size() > 0);
@@ -316,9 +318,16 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
.filter(metric ->
metric.tags.get("consumer_name").equals(consumer1Name)
||
metric.tags.get("consumer_name").equals(consumer2Name))
.mapToDouble(metric -> metric.value).sum();
+ double subAckRate = subAckRateMetrics
+ .stream()
+ .filter(m -> m.tags.get("subscription").equals(subName))
+ .mapToDouble(m -> m.value)
+ .sum();
+ Assert.assertEquals(subAckRateMetrics.size(), 1);
Assert.assertTrue(totalAckRate > 0D);
Assert.assertTrue(totalRateOut > 0D);
+ Assert.assertEquals(totalAckRate, subAckRate, 0.1D * totalAckRate);
Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut *
0.1D);
} else {
double totalAckRate = ackRateMetric.stream()