This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6f0f3fb35647a5ef3ff9e23d393fe8e7346199ee Author: feynmanlin <[email protected]> AuthorDate: Sat Dec 12 02:12:11 2020 +0800 Export Prometheus metric for messageTTL (#8871) Fixes #8573 Some users who want to know how many messages are expired at what time? Currently, these metrics are too few, so that TTL looks like a black box, unobservable add `totalMsgExpired`、`lastExpireTimestamp`、`msgRateExpired` for Prometheus metric PrometheusMetricsTest.java (cherry picked from commit 060e35b587beaf74484ecc58fe7e0b91d4cf630e) --- .../persistent/PersistentMessageExpiryMonitor.java | 9 +- .../service/persistent/PersistentSubscription.java | 1 + .../prometheus/AggregatedSubscriptionStats.java | 6 ++ .../stats/prometheus/NamespaceStatsAggregator.java | 3 + .../pulsar/broker/stats/prometheus/TopicStats.java | 36 +++++-- .../pulsar/broker/stats/PrometheusMetricsTest.java | 104 +++++++++++++++++++++ .../common/policies/data/SubscriptionStats.java | 5 + 7 files changed, 153 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 0380c32..315a0a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -20,7 +20,7 @@ package org.apache.pulsar.broker.service.persistent; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - +import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; @@ -40,6 +40,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { private final String subName; private final String topicName; private final Rate msgExpired; + private final LongAdder totalMsgExpired; private final boolean autoSkipNonRecoverableData; private final PersistentSubscription subscription; @@ -56,6 +57,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { this.subName = subscriptionName; this.subscription = subscription; this.msgExpired = new Rate(); + this.totalMsgExpired = new LongAdder(); // check to avoid test failures this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); @@ -97,6 +99,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { return msgExpired.getRate(); } + public long getTotalMessageExpired() { + return totalMsgExpired.sum(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentMessageExpiryMonitor.class); private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() { @@ -104,6 +110,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { public void markDeleteComplete(Object ctx) { long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false); msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */); + totalMsgExpired.add(numMessagesExpired); updateRates(); // If the subscription is a Key_Shared subscription, we should to trigger message dispatch. if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 0cc4e97..e75763f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -936,6 +936,7 @@ public class PersistentSubscription implements Subscription { subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog); subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed; subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate(); + subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired(); subStats.isReplicated = isReplicated(); subStats.isDurable = cursor.isDurable(); if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index 1f1e879..f3573f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -45,5 +45,11 @@ public class AggregatedSubscriptionStats { long bytesOutCounter; + long lastExpireTimestamp; + + double msgRateExpired; + + long totalMsgExpired; + public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index ab2e670..3e52c44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -138,6 +138,9 @@ public class NamespaceStatsAggregator { .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; + subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; + subsStats.msgRateExpired = subscriptionStats.msgRateExpired; + subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; subscriptionStats.consumers.forEach(cStats -> { stats.consumersCount++; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index dd065d4..d27e863 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -155,16 +155,32 @@ class TopicStats { metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum()); stats.subscriptionStats.forEach((n, subsStats) -> { - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut); - metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter); - metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", + subsStats.msgBacklog); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", + subsStats.msgBacklogNoDelayed); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", + subsStats.msgDelayed); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", + subsStats.msgRateRedeliver); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", + subsStats.unackedMessages); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", + subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", + subsStats.msgRateOut); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", + subsStats.msgThroughputOut); + metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", + subsStats.bytesOutCounter); + metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", + subsStats.msgOutCounter); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp", + subsStats.lastExpireTimestamp); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired", + subsStats.msgRateExpired); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", + subsStats.totalMsgExpired); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index b5e780e..cba6d1e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -25,19 +25,29 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.math.RoundingMode; +import java.text.NumberFormat; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -186,6 +196,100 @@ public class PrometheusMetricsTest extends BrokerTestBase { } @Test + public void testPerTopicExpiredStat() throws Exception { + String ns = "prop/ns-abc1"; + admin.namespaces().createNamespace(ns); + String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1"; + String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2"; + List<String> topicList = Arrays.asList(topic2,topic1); + Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create(); + Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create(); + final String subName = "test"; + for (String topic : topicList) { + pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe().close(); + } + + final int messages = 10; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + p2.send(message.getBytes()); + } + + p1.close(); + p2.close(); + // Let the message expire + for (String topic : topicList) { + PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1); + } + pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry); + //wait for checkMessageExpiry + PersistentSubscription sub = (PersistentSubscription) + pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName); + PersistentSubscription sub2 = (PersistentSubscription) + pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub.getExpiredMessageRate() != 0.0); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub2.getExpiredMessageRate() != 0.0); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut); + String metricsStr = new String(statsOut.toByteArray()); + Multimap<String, Metric> metrics = parseMetrics(metricsStr); + // There should be 2 metrics with different tags for each topic + List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), topic2); + assertEquals(cm.get(0).tags.get("namespace"), ns); + assertEquals(cm.get(1).tags.get("topic"), topic1); + assertEquals(cm.get(1).tags.get("namespace"), ns); + + //check value + Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp"); + field.setAccessible(true); + for (int i = 0; i < topicList.size(); i++) { + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName); + assertEquals((long) field.get(subscription), (long) cm.get(i).value); + } + + cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), topic2); + assertEquals(cm.get(0).tags.get("namespace"), ns); + assertEquals(cm.get(1).tags.get("topic"), topic1); + assertEquals(cm.get(1).tags.get("namespace"), ns); + //check value + field = PersistentSubscription.class.getDeclaredField("expiryMonitor"); + field.setAccessible(true); + NumberFormat nf = NumberFormat.getNumberInstance(); + nf.setMaximumFractionDigits(3); + nf.setRoundingMode(RoundingMode.DOWN); + for (int i = 0; i < topicList.size(); i++) { + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName); + PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription); + assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value); + } + + cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(0).tags.get("topic"), topic2); + assertEquals(cm.get(0).tags.get("namespace"), ns); + assertEquals(cm.get(1).tags.get("topic"), topic1); + assertEquals(cm.get(1).tags.get("namespace"), ns); + //check value + for (int i = 0; i < topicList.size(); i++) { + assertEquals(messages, (long)cm.get(i).value); + } + + } + + @Test public void testPerNamespaceStats() throws Exception { Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index c3a09ab..12c5767 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -74,6 +74,9 @@ public class SubscriptionStats { /** Total rate of messages expired on this subscription (msg/s). */ public double msgRateExpired; + /** Total messages expired on this subscription. */ + public long totalMsgExpired; + /** Last message expire execution timestamp. */ public long lastExpireTimestamp; @@ -119,6 +122,7 @@ public class SubscriptionStats { msgBacklogNoDelayed = 0; unackedMessages = 0; msgRateExpired = 0; + totalMsgExpired = 0; lastExpireTimestamp = 0L; consumers.clear(); consumersAfterMarkDeletePosition.clear(); @@ -139,6 +143,7 @@ public class SubscriptionStats { this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed; this.unackedMessages += stats.unackedMessages; this.msgRateExpired += stats.msgRateExpired; + this.totalMsgExpired += stats.totalMsgExpired; this.isReplicated |= stats.isReplicated; this.isDurable |= stats.isDurable; if (this.consumers.size() != stats.consumers.size()) {
