This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new efc39937c2c add metric for InMemoryDelayedDeliveryTracker's memory 
usage  (#15867)
efc39937c2c is described below

commit efc39937c2cd54e777b7eab2ba5bf1b3bfd50734
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Aug 2 18:54:50 2022 +0800

    add metric for InMemoryDelayedDeliveryTracker's memory usage  (#15867)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  4 +
 .../PersistentDispatcherMultipleConsumers.java     | 14 ++++
 .../service/persistent/PersistentSubscription.java |  6 ++
 .../broker/service/persistent/PersistentTopic.java |  1 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  3 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  4 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  5 ++
 .../service/persistent/PersistentTopicTest.java    | 90 ++++++++++++++++++++--
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 .../common/policies/data/stats/TopicStatsImpl.java |  5 ++
 10 files changed, 131 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 837d3d1872c..390ce5d5071 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -192,6 +192,10 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         return priorityQueue.size();
     }
 
+    public long getBufferMemoryUsage() {
+        return priorityQueue.bytesCapacity();
+    }
+
     /**
      * Update the scheduled timer task such that:
      * 1. If there are no delayed messages, return and do not schedule a timer 
task.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 167bc188e99..a02b76c9aed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,6 +46,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -1099,6 +1100,19 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return topic;
     }
 
+
+    public long getDelayedTrackerMemoryUsage() {
+        if (delayedDeliveryTracker.isEmpty()) {
+            return 0;
+        }
+
+        if (delayedDeliveryTracker.get() instanceof 
InMemoryDelayedDeliveryTracker) {
+            return ((InMemoryDelayedDeliveryTracker) 
delayedDeliveryTracker.get()).getBufferMemoryUsage();
+        }
+
+        return 0;
+    }
+
     protected int getStickyKeyHash(Entry entry) {
         return 
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 22f94ee0545..a88744b4edc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1099,6 +1099,12 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                 subStats.activeConsumerName = activeConsumer.consumerName();
             }
         }
+
+        if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subStats.delayedTrackerMemoryUsage =
+                    ((PersistentDispatcherMultipleConsumers) 
dispatcher).getDelayedTrackerMemoryUsage();
+        }
+
         if (Subscription.isIndividualAckMode(subType)) {
             if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
                 PersistentDispatcherMultipleConsumers d = 
(PersistentDispatcherMultipleConsumers) dispatcher;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 335355b112b..00e4bc01ec4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1886,6 +1886,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             stats.nonContiguousDeletedMessagesRanges += 
subStats.nonContiguousDeletedMessagesRanges;
             stats.nonContiguousDeletedMessagesRangesSerializedSize +=
                     subStats.nonContiguousDeletedMessagesRangesSerializedSize;
+            stats.delayedMessageIndexSizeInBytes += 
subStats.delayedTrackerMemoryUsage;
         });
 
         replicators.forEach((cluster, replicator) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 5610dbab218..761094ac0e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -59,6 +59,7 @@ public class AggregatedNamespaceStats {
     long compactionCompactedEntriesCount;
     long compactionCompactedEntriesSize;
     StatsBuckets compactionLatencyBuckets = new 
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+    int delayedTrackerMemoryUsage;
 
     void updateStats(TopicStats stats) {
         topicsCount++;
@@ -76,6 +77,7 @@ public class AggregatedNamespaceStats {
         msgInCounter += stats.msgInCounter;
         bytesOutCounter += stats.bytesOutCounter;
         msgOutCounter += stats.msgOutCounter;
+        delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
 
         managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
         managedLedgerStats.storageLogicalSize += 
stats.managedLedgerStats.storageLogicalSize;
@@ -156,5 +158,6 @@ public class AggregatedNamespaceStats {
 
         replicationStats.clear();
         subscriptionStats.clear();
+        delayedTrackerMemoryUsage = 0;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 69e25755621..c3e8567de54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -184,6 +184,7 @@ public class NamespaceStatsAggregator {
         stats.bytesOutCounter = tStatus.bytesOutCounter;
         stats.averageMsgSize = tStatus.averageMsgSize;
         stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
+        stats.delayedTrackerMemoryUsage = 
tStatus.delayedMessageIndexSizeInBytes;
 
         stats.producersCount = 0;
         topic.getProducers().values().forEach(producer -> {
@@ -348,6 +349,9 @@ public class NamespaceStatsAggregator {
 
         metric(stream, cluster, namespace, "pulsar_subscription_delayed", 
stats.msgDelayed);
 
+        metric(stream, cluster, namespace, 
"pulsar_delayed_message_index_size_bytes",
+                stats.delayedTrackerMemoryUsage);
+
         metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_msg_backlog", "local", stats.msgBacklog);
 
         stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 7f38a323cdc..99838ccfae9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -67,6 +67,7 @@ class TopicStats {
     long compactionCompactedEntriesCount;
     long compactionCompactedEntriesSize;
     StatsBuckets compactionLatencyBuckets = new 
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+    public int delayedTrackerMemoryUsage;
 
     public void reset() {
         subscriptionsCount = 0;
@@ -100,6 +101,7 @@ class TopicStats {
         compactionCompactedEntriesCount = 0;
         compactionCompactedEntriesSize = 0;
         compactionLatencyBuckets.reset();
+        delayedTrackerMemoryUsage = 0;
     }
 
     static void resetTypes() {
@@ -148,6 +150,9 @@ class TopicStats {
         metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_quota_limit_time",
                 stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
 
+        metric(stream, cluster, namespace, topic, 
"pulsar_delayed_message_index_size_bytes",
+                stats.delayedTrackerMemoryUsage, 
splitTopicAndPartitionIndexLabel);
+
         long[] latencyBuckets = 
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
         metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
                 splitTopicAndPartitionIndexLabel);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index a150e039d77..a62e8762ac5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,28 +29,33 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -269,4 +274,77 @@ public class PersistentTopicTest extends BrokerTestBase {
             producer.close();
         }
     }
+
+
+    @DataProvider(name = "topicAndMetricsLevel")
+    public Object[][] indexPatternTestData() {
+        return new Object[][]{
+                new Object[] 
{"persistent://prop/autoNs/test_delayed_message_metric", true},
+                new Object[] 
{"persistent://prop/autoNs/test_delayed_message_metric", false},
+        };
+    }
+
+
+    @Test(dataProvider = "topicAndMetricsLevel")
+    public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, 
boolean exposeTopicLevelMetrics) throws Exception {
+        PulsarClient client = pulsar.getClient();
+        String namespace = TopicName.get(topic).getNamespace();
+        admin.namespaces().createNamespace(namespace);
+
+        final int messages = 100;
+        CountDownLatch latch = new CountDownLatch(messages);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test_sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((MessageListener<String>) (consumer1, msg) -> 
{
+                    try {
+                        latch.countDown();
+                        consumer1.acknowledge(msg);
+                    } catch (PulsarClientException e) {
+                        e.printStackTrace();
+                    }
+                })
+                .subscribe();
+        for (int a = 0; a < messages; a++) {
+            producer.newMessage()
+                    .value(UUID.randomUUID().toString())
+                    .deliverAfter(30, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+        producer.flush();
+
+        latch.await(10, TimeUnit.SECONDS);
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
+        String metricsStr = output.toString(StandardCharsets.UTF_8);
+
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
+        Collection<PrometheusMetricsTest.Metric> metrics = 
metricsMap.get("pulsar_delayed_message_index_size_bytes");
+        Assert.assertTrue(metrics.size() > 0);
+
+        int topicLevelNum = 0;
+        int namespaceLevelNum = 0;
+        for (PrometheusMetricsTest.Metric metric : metrics) {
+            if (exposeTopicLevelMetrics && 
metric.tags.get("topic").equals(topic)) {
+                Assert.assertTrue(metric.value > 0);
+                topicLevelNum++;
+            } else if (!exposeTopicLevelMetrics && 
metric.tags.get("namespace").equals(namespace)) {
+                Assert.assertTrue(metric.value > 0);
+                namespaceLevelNum++;
+            }
+        }
+
+        if (exposeTopicLevelMetrics) {
+            Assert.assertTrue(topicLevelNum > 0);
+            Assert.assertEquals(0, namespaceLevelNum);
+        } else {
+            Assert.assertTrue(namespaceLevelNum > 0);
+            Assert.assertEquals(topicLevelNum, 0);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index cd163d84823..a945ae9cf86 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -131,6 +131,9 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** The serialized size of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRangesSerializedSize;
 
+    /** The size of InMemoryDelayedDeliveryTracer memory usage. */
+    public long delayedTrackerMemoryUsage;
+
     /** SubscriptionProperties (key/value strings) associated with this 
subscribe. */
     public Map<String, String> subscriptionProperties;
 
@@ -158,6 +161,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         consumersAfterMarkDeletePosition.clear();
         nonContiguousDeletedMessagesRanges = 0;
         nonContiguousDeletedMessagesRangesSerializedSize = 0;
+        delayedTrackerMemoryUsage = 0;
         subscriptionProperties.clear();
     }
 
@@ -193,6 +197,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
         this.nonContiguousDeletedMessagesRanges += 
stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += 
stats.nonContiguousDeletedMessagesRangesSerializedSize;
+        this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
         this.subscriptionProperties.putAll(stats.subscriptionProperties);
         return this;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 3e90ca9be94..f6cdbba5036 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -131,6 +131,9 @@ public class TopicStatsImpl implements TopicStats {
     /** The serialized size of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRangesSerializedSize;
 
+    /** The size of InMemoryDelayedDeliveryTracer memory usage. */
+    public int delayedMessageIndexSizeInBytes;
+
     /** The compaction stats. */
     public CompactionStatsImpl compaction;
 
@@ -200,6 +203,7 @@ public class TopicStatsImpl implements TopicStats {
         this.lastOffloadFailureTimeStamp = 0;
         this.lastOffloadSuccessTimeStamp = 0;
         this.publishRateLimitedTimes = 0L;
+        this.delayedMessageIndexSizeInBytes = 0;
         this.compaction.reset();
     }
 
@@ -226,6 +230,7 @@ public class TopicStatsImpl implements TopicStats {
         this.offloadedStorageSize += stats.offloadedStorageSize;
         this.nonContiguousDeletedMessagesRanges += 
stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += 
stats.nonContiguousDeletedMessagesRangesSerializedSize;
+        this.delayedMessageIndexSizeInBytes += 
stats.delayedMessageIndexSizeInBytes;
 
         stats.getPublishers().forEach(s -> {
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {

Reply via email to