This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 99cc4a96475552ce14be8338005a45bedda38b55 Author: gaozhangmin <[email protected]> AuthorDate: Tue Feb 22 09:56:30 2022 +0800 non-persistent topic metrics (#13827) ### Motivation Non-persistent topic doesn't have subscription metrics ### Modifications Expose a new non-persistent subscription metric: `pulsar_subscription_msg_drop_rate` (cherry picked from commit d548bc4ef3b785325b43c737f347ac702a840602) --- .../prometheus/AggregatedSubscriptionStats.java | 2 + .../stats/prometheus/NamespaceStatsAggregator.java | 81 +++++++++++++--------- .../pulsar/broker/stats/prometheus/TopicStats.java | 2 + .../pulsar/broker/stats/PrometheusMetricsTest.java | 40 ++++++++++- 4 files changed, 93 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index fb74daf419f..c829be28e59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats { long totalMsgExpired; + double msgDropRate; + public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 7e36c5612d0..a3a0fcda445 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -31,7 +31,10 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl; +import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; +import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.compaction.CompactedTopicContext; @@ -109,6 +112,37 @@ public class NamespaceStatsAggregator { return Optional.ofNullable(compactor).map(c -> c.getStats()); } + private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats, + AggregatedSubscriptionStats subsStats) { + stats.subscriptionsCount++; + stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.msgBacklog = subscriptionStats.msgBacklog; + subsStats.msgDelayed = subscriptionStats.msgDelayed; + subsStats.msgRateExpired = subscriptionStats.msgRateExpired; + subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; + subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; + subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; + subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp; + subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; + subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; + subscriptionStats.consumers.forEach(cStats -> { + stats.consumersCount++; + subsStats.unackedMessages += cStats.unackedMessages; + subsStats.msgRateRedeliver += cStats.msgRateRedeliver; + subsStats.msgRateOut += cStats.msgRateOut; + subsStats.msgThroughputOut += cStats.msgThroughputOut; + subsStats.bytesOutCounter += cStats.bytesOutCounter; + subsStats.msgOutCounter += cStats.msgOutCounter; + if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { + subsStats.blockedSubscriptionOnUnackedMsgs = true; + } + }); + stats.rateOut += subsStats.msgRateOut; + stats.throughputOut += subsStats.msgThroughputOut; + + } + private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) { @@ -141,7 +175,6 @@ public class NamespaceStatsAggregator { stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate(); stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate(); } - TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false); stats.msgInCounter = tStatus.msgInCounter; stats.bytesInCounter = tStatus.bytesInCounter; @@ -175,37 +208,23 @@ public class NamespaceStatsAggregator { } }); - tStatus.subscriptions.forEach((subName, subscriptionStats) -> { - stats.subscriptionsCount++; - stats.msgBacklog += subscriptionStats.msgBacklog; - - AggregatedSubscriptionStats subsStats = stats.subscriptionStats - .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); - subsStats.msgBacklog = subscriptionStats.msgBacklog; - subsStats.msgDelayed = subscriptionStats.msgDelayed; - subsStats.msgRateExpired = subscriptionStats.msgRateExpired; - subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; - subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; - subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; - subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp; - subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; - subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; - subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; - subscriptionStats.consumers.forEach(cStats -> { - stats.consumersCount++; - subsStats.unackedMessages += cStats.unackedMessages; - subsStats.msgRateRedeliver += cStats.msgRateRedeliver; - subsStats.msgRateOut += cStats.msgRateOut; - subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; - if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { - subsStats.blockedSubscriptionOnUnackedMsgs = true; - } + if (topic instanceof PersistentTopic) { + tStatus.subscriptions.forEach((subName, subscriptionStats) -> { + AggregatedSubscriptionStats subsStats = stats.subscriptionStats + .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); + aggregateTopicStats(stats, subscriptionStats, subsStats); }); - stats.rateOut += subsStats.msgRateOut; - stats.throughputOut += subsStats.msgThroughputOut; - }); + } else { + ((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions() + .forEach((subName, nonPersistentSubscriptionStats) -> { + NonPersistentSubscriptionStatsImpl subscriptionStats = + (NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats; + AggregatedSubscriptionStats subsStats = stats.subscriptionStats + .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); + aggregateTopicStats(stats, subscriptionStats, subsStats); + subsStats.msgDropRate += subscriptionStats.getMsgDropRate(); + }); + } // Consumer stats can be a lot if a subscription has many consumers if (includeConsumerMetrics) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index f0556e8da90..35dbdba274a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -266,6 +266,8 @@ class TopicStats { subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate", + subsStats.msgDropRate, splitTopicAndPartitionIndexLabel); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 3f64619a983..c4823c8b0e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -455,7 +455,6 @@ public class PrometheusMetricsTest extends BrokerTestBase { .subscribe(); final int messages = 10; - for (int i = 0; i < messages; i++) { String message = "my-message-" + i; p1.send(message.getBytes()); @@ -491,6 +490,45 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count")); } + @Test + public void testNonPersistentSubMetrics() throws Exception { + Producer<byte[]> p1 = + pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create(); + + Consumer<byte[]> c1 = pulsarClient.newConsumer() + .topic("non-persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + final int messages = 100; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + } + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap<String, Metric> metrics = parseMetrics(metricsStr); + assertTrue(metrics.containsKey("pulsar_subscription_back_log")); + assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed")); + assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out")); + assertTrue(metrics.containsKey("pulsar_throughput_out")); + assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver")); + assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages")); + assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages")); + assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out")); + assertTrue(metrics.containsKey("pulsar_out_bytes_total")); + assertTrue(metrics.containsKey("pulsar_out_messages_total")); + assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp")); + assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate")); + } + @Test public void testPerNamespaceStats() throws Exception { Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
