This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new efc39937c2c add metric for InMemoryDelayedDeliveryTracker's memory
usage (#15867)
efc39937c2c is described below
commit efc39937c2cd54e777b7eab2ba5bf1b3bfd50734
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Aug 2 18:54:50 2022 +0800
add metric for InMemoryDelayedDeliveryTracker's memory usage (#15867)
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 4 +
.../PersistentDispatcherMultipleConsumers.java | 14 ++++
.../service/persistent/PersistentSubscription.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 1 +
.../stats/prometheus/AggregatedNamespaceStats.java | 3 +
.../stats/prometheus/NamespaceStatsAggregator.java | 4 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 5 ++
.../service/persistent/PersistentTopicTest.java | 90 ++++++++++++++++++++--
.../policies/data/stats/SubscriptionStatsImpl.java | 5 ++
.../common/policies/data/stats/TopicStatsImpl.java | 5 ++
10 files changed, 131 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 837d3d1872c..390ce5d5071 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -192,6 +192,10 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
return priorityQueue.size();
}
+ public long getBufferMemoryUsage() {
+ return priorityQueue.bytesCapacity();
+ }
+
/**
* Update the scheduled timer task such that:
* 1. If there are no delayed messages, return and do not schedule a timer
task.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 167bc188e99..a02b76c9aed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,6 +46,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -1099,6 +1100,19 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return topic;
}
+
+ public long getDelayedTrackerMemoryUsage() {
+ if (delayedDeliveryTracker.isEmpty()) {
+ return 0;
+ }
+
+ if (delayedDeliveryTracker.get() instanceof
InMemoryDelayedDeliveryTracker) {
+ return ((InMemoryDelayedDeliveryTracker)
delayedDeliveryTracker.get()).getBufferMemoryUsage();
+ }
+
+ return 0;
+ }
+
protected int getStickyKeyHash(Entry entry) {
return
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 22f94ee0545..a88744b4edc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1099,6 +1099,12 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
subStats.activeConsumerName = activeConsumer.consumerName();
}
}
+
+ if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
+ subStats.delayedTrackerMemoryUsage =
+ ((PersistentDispatcherMultipleConsumers)
dispatcher).getDelayedTrackerMemoryUsage();
+ }
+
if (Subscription.isIndividualAckMode(subType)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers d =
(PersistentDispatcherMultipleConsumers) dispatcher;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 335355b112b..00e4bc01ec4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1886,6 +1886,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.nonContiguousDeletedMessagesRanges +=
subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
+ stats.delayedMessageIndexSizeInBytes +=
subStats.delayedTrackerMemoryUsage;
});
replicators.forEach((cluster, replicator) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 5610dbab218..761094ac0e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -59,6 +59,7 @@ public class AggregatedNamespaceStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+ int delayedTrackerMemoryUsage;
void updateStats(TopicStats stats) {
topicsCount++;
@@ -76,6 +77,7 @@ public class AggregatedNamespaceStats {
msgInCounter += stats.msgInCounter;
bytesOutCounter += stats.bytesOutCounter;
msgOutCounter += stats.msgOutCounter;
+ delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize +=
stats.managedLedgerStats.storageLogicalSize;
@@ -156,5 +158,6 @@ public class AggregatedNamespaceStats {
replicationStats.clear();
subscriptionStats.clear();
+ delayedTrackerMemoryUsage = 0;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 69e25755621..c3e8567de54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -184,6 +184,7 @@ public class NamespaceStatsAggregator {
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
+ stats.delayedTrackerMemoryUsage =
tStatus.delayedMessageIndexSizeInBytes;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
@@ -348,6 +349,9 @@ public class NamespaceStatsAggregator {
metric(stream, cluster, namespace, "pulsar_subscription_delayed",
stats.msgDelayed);
+ metric(stream, cluster, namespace,
"pulsar_delayed_message_index_size_bytes",
+ stats.delayedTrackerMemoryUsage);
+
metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_msg_backlog", "local", stats.msgBacklog);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 7f38a323cdc..99838ccfae9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -67,6 +67,7 @@ class TopicStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+ public int delayedTrackerMemoryUsage;
public void reset() {
subscriptionsCount = 0;
@@ -100,6 +101,7 @@ class TopicStats {
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
+ delayedTrackerMemoryUsage = 0;
}
static void resetTypes() {
@@ -148,6 +150,9 @@ class TopicStats {
metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic,
"pulsar_delayed_message_index_size_bytes",
+ stats.delayedTrackerMemoryUsage,
splitTopicAndPartitionIndexLabel);
+
long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
splitTopicAndPartitionIndexLabel);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index a150e039d77..a62e8762ac5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,28 +29,33 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -269,4 +274,77 @@ public class PersistentTopicTest extends BrokerTestBase {
producer.close();
}
}
+
+
+ @DataProvider(name = "topicAndMetricsLevel")
+ public Object[][] indexPatternTestData() {
+ return new Object[][]{
+ new Object[]
{"persistent://prop/autoNs/test_delayed_message_metric", true},
+ new Object[]
{"persistent://prop/autoNs/test_delayed_message_metric", false},
+ };
+ }
+
+
+ @Test(dataProvider = "topicAndMetricsLevel")
+ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic,
boolean exposeTopicLevelMetrics) throws Exception {
+ PulsarClient client = pulsar.getClient();
+ String namespace = TopicName.get(topic).getNamespace();
+ admin.namespaces().createNamespace(namespace);
+
+ final int messages = 100;
+ CountDownLatch latch = new CountDownLatch(messages);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test_sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((MessageListener<String>) (consumer1, msg) ->
{
+ try {
+ latch.countDown();
+ consumer1.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ e.printStackTrace();
+ }
+ })
+ .subscribe();
+ for (int a = 0; a < messages; a++) {
+ producer.newMessage()
+ .value(UUID.randomUUID().toString())
+ .deliverAfter(30, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+ producer.flush();
+
+ latch.await(10, TimeUnit.SECONDS);
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics,
true, true, output);
+ String metricsStr = output.toString(StandardCharsets.UTF_8);
+
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap =
PrometheusMetricsTest.parseMetrics(metricsStr);
+ Collection<PrometheusMetricsTest.Metric> metrics =
metricsMap.get("pulsar_delayed_message_index_size_bytes");
+ Assert.assertTrue(metrics.size() > 0);
+
+ int topicLevelNum = 0;
+ int namespaceLevelNum = 0;
+ for (PrometheusMetricsTest.Metric metric : metrics) {
+ if (exposeTopicLevelMetrics &&
metric.tags.get("topic").equals(topic)) {
+ Assert.assertTrue(metric.value > 0);
+ topicLevelNum++;
+ } else if (!exposeTopicLevelMetrics &&
metric.tags.get("namespace").equals(namespace)) {
+ Assert.assertTrue(metric.value > 0);
+ namespaceLevelNum++;
+ }
+ }
+
+ if (exposeTopicLevelMetrics) {
+ Assert.assertTrue(topicLevelNum > 0);
+ Assert.assertEquals(0, namespaceLevelNum);
+ } else {
+ Assert.assertTrue(namespaceLevelNum > 0);
+ Assert.assertEquals(topicLevelNum, 0);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index cd163d84823..a945ae9cf86 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -131,6 +131,9 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;
+ /** The size of InMemoryDelayedDeliveryTracer memory usage. */
+ public long delayedTrackerMemoryUsage;
+
/** SubscriptionProperties (key/value strings) associated with this
subscribe. */
public Map<String, String> subscriptionProperties;
@@ -158,6 +161,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
+ delayedTrackerMemoryUsage = 0;
subscriptionProperties.clear();
}
@@ -193,6 +197,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
this.nonContiguousDeletedMessagesRanges +=
stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize +=
stats.nonContiguousDeletedMessagesRangesSerializedSize;
+ this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
this.subscriptionProperties.putAll(stats.subscriptionProperties);
return this;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 3e90ca9be94..f6cdbba5036 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -131,6 +131,9 @@ public class TopicStatsImpl implements TopicStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;
+ /** The size of InMemoryDelayedDeliveryTracer memory usage. */
+ public int delayedMessageIndexSizeInBytes;
+
/** The compaction stats. */
public CompactionStatsImpl compaction;
@@ -200,6 +203,7 @@ public class TopicStatsImpl implements TopicStats {
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
+ this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
}
@@ -226,6 +230,7 @@ public class TopicStatsImpl implements TopicStats {
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges +=
stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize +=
stats.nonContiguousDeletedMessagesRangesSerializedSize;
+ this.delayedMessageIndexSizeInBytes +=
stats.delayedMessageIndexSizeInBytes;
stats.getPublishers().forEach(s -> {
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {