This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 68c10eed760 [feat][broker][PIP-195] Add metrics for bucket delayed
message tracker (#19716)
68c10eed760 is described below
commit 68c10eed7604aa3dcc3a6d8b548575e99b94dca2
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Mar 30 16:32:13 2023 +0800
[feat][broker][PIP-195] Add metrics for bucket delayed message tracker
(#19716)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 2 +-
.../broker/delayed/DelayedDeliveryTracker.java | 6 -
.../delayed/InMemoryDelayedDeliveryTracker.java | 5 -
.../bucket/BookkeeperBucketSnapshotStorage.java | 15 +--
.../bucket/BucketDelayedDeliveryTracker.java | 74 ++++++++---
.../bucket/BucketDelayedMessageIndexStats.java | 146 +++++++++++++++++++++
.../broker/delayed/bucket/ImmutableBucket.java | 28 +++-
.../broker/delayed/bucket/MutableBucket.java | 6 +-
.../PersistentDispatcherMultipleConsumers.java | 15 ++-
.../service/persistent/PersistentSubscription.java | 3 +
.../broker/service/persistent/PersistentTopic.java | 9 ++
.../stats/prometheus/AggregatedNamespaceStats.java | 19 +++
.../prometheus/AggregatedSubscriptionStats.java | 3 +
.../stats/prometheus/NamespaceStatsAggregator.java | 13 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 22 ++++
.../broker/delayed/MockBucketSnapshotStorage.java | 2 +-
.../bucket/BucketDelayedDeliveryTrackerTest.java | 23 ++--
.../persistent/BucketDelayedDeliveryTest.java | 143 ++++++++++++++++++++
.../policies/data/stats/SubscriptionStatsImpl.java | 12 ++
.../policies/data/stats/TopicMetricBean.java | 30 +++++
.../common/policies/data/stats/TopicStatsImpl.java | 13 ++
21 files changed, 532 insertions(+), 57 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 03e0fec0a53..52cebe15f6a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -366,7 +366,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
300;
@FieldContext(category = CATEGORY_SERVER, doc = """
- The max number of delayed message index in per bucket snapshot
segment, -1 means no limitation\
+ The max number of delayed message index in per bucket snapshot
segment, -1 means no limitation, \
after reaching the max number limitation, the snapshot segment
will be cut off.""")
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 3cc2da8db1e..78229fef25a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -67,12 +67,6 @@ public interface DelayedDeliveryTracker extends
AutoCloseable {
*/
boolean shouldPauseAllDeliveries();
- /**
- * Tells whether this DelayedDeliveryTracker contains this message index,
- * if the tracker is not supported it or disabled this feature also will
return false.
- */
- boolean containsMessage(long ledgerId, long entryId);
-
/**
* Reset tick time use zk policies cache.
* @param tickTime
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 8de6ee58e2c..58358b06a46 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -178,11 +178,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
&& !hasMessageAvailable();
}
- @Override
- public boolean containsMessage(long ledgerId, long entryId) {
- return false;
- }
-
protected long nextDeliveryTime() {
return priorityQueue.peekN1();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 08202bb1915..e7d4f9301dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -67,7 +67,7 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long
bucketId) {
return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0,
0).
+ ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
thenApply(entryEnumeration ->
parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
}
@@ -75,17 +75,13 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
public CompletableFuture<List<SnapshotSegment>>
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle,
firstSegmentEntryId,
+ ledgerHandle -> getLedgerEntry(ledgerHandle,
firstSegmentEntryId,
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
}
@Override
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
- return openLedger(bucketId).thenApply(ledgerHandle -> {
- long length = ledgerHandle.getLength();
- closeLedger(ledgerHandle);
- return length;
- });
+ return openLedger(bucketId).thenApply(LedgerHandle::getLength);
}
@Override
@@ -212,8 +208,8 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
});
}
- CompletableFuture<Enumeration<LedgerEntry>>
getLedgerEntryThenCloseLedger(LedgerHandle ledger,
-
long firstEntryId, long lastEntryId) {
+ CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle
ledger,
+ long
firstEntryId, long lastEntryId) {
final CompletableFuture<Enumeration<LedgerEntry>> future = new
CompletableFuture<>();
ledger.asyncReadEntries(firstEntryId, lastEntryId,
(rc, handle, entries, ctx) -> {
@@ -222,7 +218,6 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
} else {
future.complete(entries);
}
- closeLedger(handle);
}, null
);
return future;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index ef7be187cec..a34bd51af98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -57,6 +57,7 @@ import
org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
@@ -69,6 +70,10 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
static final int AsyncOperationTimeoutSeconds = 60;
+ private static final Long INVALID_BUCKET_ID = -1L;
+
+ private static final int MAX_MERGE_NUM = 4;
+
private final long minIndexCountPerBucket;
private final long timeStepPerBucketSnapshotSegmentInMillis;
@@ -93,9 +98,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final Table<Long, Long, ImmutableBucket>
snapshotSegmentLastIndexTable;
- private static final Long INVALID_BUCKET_ID = -1L;
-
- private static final int MAX_MERGE_NUM = 4;
+ private final BucketDelayedMessageIndexStats stats;
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
Timer timer, long tickTimeMillis,
@@ -125,6 +128,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
this.lastMutableBucket =
new MutableBucket(dispatcher.getName(),
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
+ this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();
}
@@ -161,8 +165,9 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
try {
-
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2,
TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ log.error("[{}] Failed to recover delayed message index bucket
snapshot.", dispatcher.getName(), e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
@@ -193,7 +198,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
// delete asynchronously without waiting for completion
- immutableBucket.asyncDeleteBucketSnapshot();
+ immutableBucket.asyncDeleteBucketSnapshot(stats);
}
MutableLong numberDelayedMessages = new MutableLong(0);
@@ -246,7 +251,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}
- private void afterCreateImmutableBucket(Pair<ImmutableBucket,
DelayedIndex> immutableBucketDelayedIndexPair) {
+ private void afterCreateImmutableBucket(Pair<ImmutableBucket,
DelayedIndex> immutableBucketDelayedIndexPair,
+ long startTime) {
if (immutableBucketDelayedIndexPair != null) {
ImmutableBucket immutableBucket =
immutableBucketDelayedIndexPair.getLeft();
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId,
immutableBucket.endLedgerId),
@@ -260,14 +266,19 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
CompletableFuture<Long> future =
createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
+ immutableBucket.asyncUpdateSnapshotLength();
log.info("[{}] Creat bucket snapshot finish,
bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
+
+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
+ System.currentTimeMillis() - startTime);
+
return bucketId;
}
- //TODO Record create snapshot failed
- log.error("[{}] Failed to create bucket snapshot,
bucketKey: {}",
- dispatcher.getName(), immutableBucket.bucketKey(),
ex);
+ log.error("[{}] Failed to create bucket snapshot,
bucketKey: {}", dispatcher.getName(),
+ immutableBucket.bucketKey(), ex);
+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
// Put indexes back into the shared queue and downgrade to
memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
@@ -311,12 +322,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
+ long createStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
- afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair,
createStartTime);
lastMutableBucket.resetLastMutableBucketRange();
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
@@ -374,7 +387,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
if (minIndex >= 0) {
- return values.subList(minIndex, minIndex + MAX_MERGE_NUM);
+ return values.subList(minIndex, minIndex + mergeNum);
} else if (mergeNum > 2){
return selectMergedBuckets(values, mergeNum - 1);
} else {
@@ -400,6 +413,9 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
immutableBucket.merging = true;
}
+
+ long mergeStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
return
asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
synchronized (this) {
for (ImmutableBucket immutableBucket :
toBeMergeImmutableBuckets) {
@@ -409,9 +425,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketKeys:
{}",
dispatcher.getName(), bucketsStr, ex);
+
+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
} else {
log.info("[{}] Merge bucket snapshot finish, bucketKeys: {},
bucketNum: {}",
dispatcher.getName(), bucketsStr,
immutableBuckets.asMapOfRanges().size());
+
+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
+ System.currentTimeMillis() - mergeStartTime);
}
});
}
@@ -436,6 +457,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
})
.thenAccept(combinedDelayedIndexQueue -> {
synchronized (BucketDelayedDeliveryTracker.this) {
+ long createStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis,
@@ -461,12 +484,12 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
-
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___
-> {
List<CompletableFuture<Void>>
removeFutures =
-
buckets.stream().map(ImmutableBucket::asyncDeleteBucketSnapshot)
+ buckets.stream().map(bucket ->
bucket.asyncDeleteBucketSnapshot(stats))
.toList();
return
FutureUtil.waitForAll(removeFutures);
});
@@ -557,15 +580,17 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (bucket.currentSegmentEntryId ==
bucket.lastSegmentEntryId) {
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId,
bucket.endLedgerId));
- bucket.asyncDeleteBucketSnapshot();
+ bucket.asyncDeleteBucketSnapshot(stats);
continue;
}
+ long loadStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId,
bucket.endLedgerId));
- bucket.asyncDeleteBucketSnapshot();
+ bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
@@ -583,9 +608,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
log.error("[{}] Failed to load bucket snapshot
segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1, ex);
+
+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment
finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1);
+
+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+ System.currentTimeMillis() -
loadStartTime);
}
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1),
TimeUnit.SECONDS);
} catch (Exception e) {
@@ -645,7 +675,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
Iterator<ImmutableBucket> iterator =
immutableBuckets.asMapOfRanges().values().iterator();
while (iterator.hasNext()) {
ImmutableBucket bucket = iterator.next();
- futures.add(bucket.clear());
+ futures.add(bucket.clear(stats));
numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
iterator.remove();
}
@@ -661,7 +691,6 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
.orElse(false);
}
- @Override
public boolean containsMessage(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
@@ -670,4 +699,15 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return findImmutableBucket(ledgerId).map(bucket ->
bucket.containsMessage(ledgerId, entryId))
.orElse(false);
}
+
+ public Map<String, TopicMetricBean> genTopicMetricMap() {
+ stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
+
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() +
this.lastMutableBucket.size());
+ MutableLong totalSnapshotLength = new MutableLong();
+ immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
+ totalSnapshotLength.add(immutableBucket.getSnapshotLength());
+ });
+ stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
+ return stats.genTopicMetricMap();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
new file mode 100644
index 00000000000..68788c359d5
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
+
+public class BucketDelayedMessageIndexStats {
+
+ private static final long[] BUCKETS = new long[]{50, 100, 500, 1000, 5000,
30000, 60000};
+
+ enum State {
+ succeed,
+ failed,
+ all
+ }
+
+ enum Type {
+ create,
+ load,
+ delete,
+ merge
+ }
+
+ private static final String BUCKET_TOTAL_NAME =
"pulsar_delayed_message_index_bucket_total";
+ private static final String INDEX_LOADED_NAME =
"pulsar_delayed_message_index_loaded";
+ private static final String SNAPSHOT_SIZE_BYTES_NAME =
"pulsar_delayed_message_index_bucket_snapshot_size_bytes";
+ private static final String OP_COUNT_NAME =
"pulsar_delayed_message_index_bucket_op_count";
+ private static final String OP_LATENCY_NAME =
"pulsar_delayed_message_index_bucket_op_latency_ms";
+
+ private final AtomicInteger delayedMessageIndexBucketTotal = new
AtomicInteger();
+ private final AtomicLong delayedMessageIndexLoaded = new AtomicLong();
+ private final AtomicLong delayedMessageIndexBucketSnapshotSizeBytes = new
AtomicLong();
+ private final Map<String, StatsBuckets>
delayedMessageIndexBucketOpLatencyMs = new ConcurrentHashMap<>();
+ private final Map<String, LongAdder> delayedMessageIndexBucketOpCount =
new ConcurrentHashMap<>();
+
+ public BucketDelayedMessageIndexStats() {
+ }
+
+ public Map<String, TopicMetricBean> genTopicMetricMap() {
+ Map<String, TopicMetricBean> metrics = new HashMap<>();
+
+ metrics.put(BUCKET_TOTAL_NAME,
+ new TopicMetricBean(BUCKET_TOTAL_NAME,
delayedMessageIndexBucketTotal.get(), null));
+
+ metrics.put(INDEX_LOADED_NAME,
+ new TopicMetricBean(INDEX_LOADED_NAME,
delayedMessageIndexLoaded.get(), null));
+
+ metrics.put(SNAPSHOT_SIZE_BYTES_NAME,
+ new TopicMetricBean(SNAPSHOT_SIZE_BYTES_NAME,
delayedMessageIndexBucketSnapshotSizeBytes.get(), null));
+
+ delayedMessageIndexBucketOpCount.forEach((k, count) -> {
+ String[] labels = splitKey(k);
+ String[] labelsAndValues = new String[] {"state", labels[0],
"type", labels[1]};
+ String key = OP_COUNT_NAME + joinKey(labelsAndValues);
+ metrics.put(key, new TopicMetricBean(OP_COUNT_NAME,
count.sumThenReset(), labelsAndValues));
+ });
+
+ delayedMessageIndexBucketOpLatencyMs.forEach((typeName, statsBuckets)
-> {
+ statsBuckets.refresh();
+ long[] buckets = statsBuckets.getBuckets();
+ for (int i = 0; i < buckets.length; i++) {
+ long count = buckets[i];
+ if (count == 0L) {
+ continue;
+ }
+ String quantile;
+ if (i == BUCKETS.length) {
+ quantile = "overflow";
+ } else {
+ quantile = String.valueOf(BUCKETS[i]);
+ }
+ String[] labelsAndValues = new String[] {"type", typeName,
"quantile", quantile};
+ String key = OP_LATENCY_NAME + joinKey(labelsAndValues);
+
+ metrics.put(key, new TopicMetricBean(OP_LATENCY_NAME, count,
labelsAndValues));
+ }
+ String[] labelsAndValues = new String[] {"type", typeName};
+ metrics.put(OP_LATENCY_NAME + "_count" + joinKey(labelsAndValues),
+ new TopicMetricBean(OP_LATENCY_NAME + "_count",
statsBuckets.getCount(), labelsAndValues));
+ metrics.put(OP_LATENCY_NAME + "_sum" + joinKey(labelsAndValues),
+ new TopicMetricBean(OP_LATENCY_NAME + "_sum",
statsBuckets.getSum(), labelsAndValues));
+ });
+
+ return metrics;
+ }
+
+ public void recordNumOfBuckets(int numOfBuckets) {
+ delayedMessageIndexBucketTotal.set(numOfBuckets);
+ }
+
+ public void recordDelayedMessageIndexLoaded(long num) {
+ delayedMessageIndexLoaded.set(num);
+ }
+
+ public void recordBucketSnapshotSizeBytes(long sizeBytes) {
+ delayedMessageIndexBucketSnapshotSizeBytes.set(sizeBytes);
+ }
+
+ public void recordTriggerEvent(Type eventType) {
+
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.all.name(),
eventType.name()),
+ k -> new LongAdder()).increment();
+ }
+
+ public void recordSuccessEvent(Type eventType, long cost) {
+
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.succeed.name(),
eventType.name()),
+ k -> new LongAdder()).increment();
+ delayedMessageIndexBucketOpLatencyMs.computeIfAbsent(eventType.name(),
+ k -> new StatsBuckets(BUCKETS)).addValue(cost);
+ }
+
+ public void recordFailEvent(Type eventType) {
+
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.failed.name(),
eventType.name()),
+ k -> new LongAdder()).increment();
+ }
+
+ public static String joinKey(String... values) {
+ return String.join("_", values);
+ }
+
+ public static String[] splitKey(String key) {
+ return key.split("_");
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 969d326e281..82e98cefa5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -131,6 +131,9 @@ class ImmutableBucket extends Bucket {
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
+ if (isRecover) {
+ this.asyncUpdateSnapshotLength();
+ }
return indexList;
});
});
@@ -175,7 +178,9 @@ class ImmutableBucket extends Bucket {
}, BucketSnapshotPersistenceException.class, MaxRetryTimes);
}
- CompletableFuture<Void> asyncDeleteBucketSnapshot() {
+ CompletableFuture<Void>
asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats) {
+ long deleteStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
String bucketKey = bucketKey();
long bucketId = getAndUpdateBucketId();
return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
@@ -184,16 +189,33 @@ class ImmutableBucket extends Bucket {
if (ex != null) {
log.error("[{}] Failed to delete bucket snapshot,
bucketId: {}, bucketKey: {}",
dispatcherName, bucketId, bucketKey, ex);
+
+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
} else {
log.info("[{}] Delete bucket snapshot finish,
bucketId: {}, bucketKey: {}",
dispatcherName, bucketId, bucketKey);
+
+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
+ System.currentTimeMillis() - deleteStartTime);
}
});
}
- CompletableFuture<Void> clear() {
+ CompletableFuture<Void> clear(BucketDelayedMessageIndexStats stats) {
delayedIndexBitMap.clear();
return
getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).exceptionally(e -> null)
- .thenCompose(__ -> asyncDeleteBucketSnapshot());
+ .thenCompose(__ -> asyncDeleteBucketSnapshot(stats));
+ }
+
+ protected CompletableFuture<Long> asyncUpdateSnapshotLength() {
+ long bucketId = getAndUpdateBucketId();
+ return
bucketSnapshotStorage.getBucketSnapshotLength(bucketId).whenComplete((length,
ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to get snapshot length, bucketId: {},
bucketKey: {}",
+ dispatcherName, bucketId, bucketKey(), ex);
+ } else {
+ setSnapshotLength(length);
+ }
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f8a4ecc7a4d..e49ebe9606e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -62,8 +62,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
final long timeStepPerBucketSnapshotSegment, final int
maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue
delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
- log.info("[{}] Creating bucket snapshot, startLedgerId: {},
endLedgerId: {}", dispatcherName,
- startLedgerId, endLedgerId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Creating bucket snapshot, startLedgerId: {},
endLedgerId: {}", dispatcherName,
+ startLedgerId, endLedgerId);
+ }
if (delayedIndexQueue.isEmpty()) {
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index e5c9e85bac3..7ff6e72d02a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -72,6 +72,7 @@ import
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
@@ -332,7 +333,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
Predicate<PositionImpl> skipCondition = null;
final DelayedDeliveryTracker deliveryTracker =
delayedDeliveryTracker.get();
if (deliveryTracker instanceof
BucketDelayedDeliveryTracker) {
- skipCondition = position -> deliveryTracker
+ skipCondition = position ->
((BucketDelayedDeliveryTracker) deliveryTracker)
.containsMessage(position.getLedgerId(),
position.getEntryId());
}
cursor.asyncReadEntriesWithSkipOrWait(messagesToRead,
bytesToRead, this, ReadType.Normal,
@@ -1180,6 +1181,18 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return 0;
}
+ public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+ if (delayedDeliveryTracker.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ if (delayedDeliveryTracker.get() instanceof
BucketDelayedDeliveryTracker) {
+ return ((BucketDelayedDeliveryTracker)
delayedDeliveryTracker.get()).genTopicMetricMap();
+ }
+
+ return Collections.emptyMap();
+ }
+
public ManagedCursor getCursor() {
return cursor;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index bdb3c9fc391..4ed191a9b4f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1147,6 +1147,9 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.delayedMessageIndexSizeInBytes =
((PersistentDispatcherMultipleConsumers)
dispatcher).getDelayedTrackerMemoryUsage();
+
+ subStats.bucketDelayedIndexStats =
+ ((PersistentDispatcherMultipleConsumers)
dispatcher).getBucketDelayedIndexStats();
}
if (Subscription.isIndividualAckMode(subType)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 82a4f531235..fa08330ff3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -153,6 +153,7 @@ import
org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -2183,6 +2184,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes +=
subStats.delayedMessageIndexSizeInBytes;
+
+ subStats.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ stats.bucketDelayedIndexStats.computeIfAbsent(k, __ ->
new TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
});
replicators.forEach((cluster, replicator) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 0a905daa341..ea77bd69302 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus;
import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.compaction.CompactionRecord;
public class AggregatedNamespaceStats {
@@ -65,6 +66,8 @@ public class AggregatedNamespaceStats {
StatsBuckets compactionLatencyBuckets = new
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
int delayedMessageIndexSizeInBytes;
+ Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
+
void updateStats(TopicStats stats) {
topicsCount++;
@@ -83,6 +86,14 @@ public class AggregatedNamespaceStats {
msgOutCounter += stats.msgOutCounter;
delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
+ stats.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ bucketDelayedIndexStats.computeIfAbsent(k, __ -> new
TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
+
this.ongoingTxnCount += stats.ongoingTxnCount;
this.abortedTxnCount += stats.abortedTxnCount;
this.committedTxnCount += stats.committedTxnCount;
@@ -132,6 +143,13 @@ public class AggregatedNamespaceStats {
subsStats.filterRejectedMsgCount += as.filterRejectedMsgCount;
subsStats.filterRescheduledMsgCount +=
as.filterRescheduledMsgCount;
subsStats.delayedMessageIndexSizeInBytes +=
as.delayedMessageIndexSizeInBytes;
+ as.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ subsStats.bucketDelayedIndexStats.computeIfAbsent(k,
__ -> new TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
as.consumerStat.forEach((c, v) -> {
AggregatedConsumerStats consumerStats =
subsStats.consumerStat.computeIfAbsent(c, k -> new
AggregatedConsumerStats());
@@ -172,5 +190,6 @@ public class AggregatedNamespaceStats {
replicationStats.clear();
subscriptionStats.clear();
delayedMessageIndexSizeInBytes = 0;
+ bucketDelayedIndexStats.clear();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index 383c671754d..da0324c5565 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
public class AggregatedSubscriptionStats {
@@ -75,4 +76,6 @@ public class AggregatedSubscriptionStats {
public Map<Consumer, AggregatedConsumerStats> consumerStat = new
HashMap<>();
long delayedMessageIndexSizeInBytes;
+
+ public Map<String, TopicMetricBean> bucketDelayedIndexStats = new
HashMap<>();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 918aef539cf..32fb06ea3ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -155,6 +156,7 @@ public class NamespaceStatsAggregator {
subsStats.filterRejectedMsgCount =
subscriptionStats.filterRejectedMsgCount;
subsStats.filterRescheduledMsgCount =
subscriptionStats.filterRescheduledMsgCount;
subsStats.delayedMessageIndexSizeInBytes =
subscriptionStats.delayedMessageIndexSizeInBytes;
+ subsStats.bucketDelayedIndexStats =
subscriptionStats.bucketDelayedIndexStats;
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean
includeConsumerMetrics,
@@ -197,6 +199,7 @@ public class NamespaceStatsAggregator {
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedMessageIndexSizeInBytes =
tStatus.delayedMessageIndexSizeInBytes;
+ stats.bucketDelayedIndexStats = tStatus.bucketDelayedIndexStats;
stats.abortedTxnCount = tStatus.abortedTxnCount;
stats.ongoingTxnCount = tStatus.ongoingTxnCount;
stats.committedTxnCount = tStatus.committedTxnCount;
@@ -379,6 +382,10 @@ public class NamespaceStatsAggregator {
writeMetric(stream, "pulsar_delayed_message_index_size_bytes",
stats.delayedMessageIndexSizeInBytes, cluster,
namespace);
+ stats.bucketDelayedIndexStats.forEach((k, metric) -> {
+ writeMetric(stream, metric.name, metric.value, cluster, namespace,
metric.labelsAndValues);
+ });
+
writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
@@ -472,8 +479,10 @@ public class NamespaceStatsAggregator {
}
private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
- String namespace) {
- stream.writeSample(metricName, value, "cluster", cluster, "namespace",
namespace);
+ String namespace, String...
extraLabelsAndValues) {
+ String[] labelsAndValues = new String[]{"cluster", cluster,
"namespace", namespace};
+ String[] labels = ArrayUtils.addAll(labelsAndValues,
extraLabelsAndValues);
+ stream.writeSample(metricName, value, labels);
}
private static void writeReplicationStat(PrometheusMetricStreams stream,
String metricName,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index abc6979484e..3a2563a8758 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;
@@ -70,6 +71,8 @@ class TopicStats {
StatsBuckets compactionLatencyBuckets = new
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
public long delayedMessageIndexSizeInBytes;
+ Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
+
public void reset() {
subscriptionsCount = 0;
producersCount = 0;
@@ -107,6 +110,7 @@ class TopicStats {
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
delayedMessageIndexSizeInBytes = 0;
+ bucketDelayedIndexStats.clear();
}
public static void printTopicStats(PrometheusMetricStreams stream,
TopicStats stats,
@@ -162,6 +166,11 @@ class TopicStats {
writeMetric(stream, "pulsar_delayed_message_index_size_bytes",
stats.delayedMessageIndexSizeInBytes,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ for (TopicMetricBean topicMetricBean :
stats.bucketDelayedIndexStats.values()) {
+ writeTopicMetric(stream, topicMetricBean.name,
topicMetricBean.value, cluster, namespace,
+ topic, splitTopicAndPartitionIndexLabel,
topicMetricBean.labelsAndValues);
+ }
+
long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
latencyBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
@@ -310,6 +319,13 @@ class TopicStats {
subsStats.delayedMessageIndexSizeInBytes, cluster,
namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ final String[] subscriptionLabel = {"subscription", sub};
+ for (TopicMetricBean topicMetricBean :
subsStats.bucketDelayedIndexStats.values()) {
+ String[] labelsAndValues =
ArrayUtils.addAll(subscriptionLabel, topicMetricBean.labelsAndValues);
+ writeTopicMetric(stream, topicMetricBean.name,
topicMetricBean.value, cluster, namespace,
+ topic, splitTopicAndPartitionIndexLabel,
labelsAndValues);
+ }
+
subsStats.consumerStat.forEach((c, consumerStats) -> {
writeConsumerMetric(stream,
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
@@ -409,6 +425,12 @@ class TopicStats {
writeMetric(stream, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount(), cluster,
namespace, topic,
splitTopicAndPartitionIndexLabel);
+
+ for (TopicMetricBean topicMetricBean :
stats.bucketDelayedIndexStats.values()) {
+ String[] labelsAndValues = topicMetricBean.labelsAndValues;
+ writeTopicMetric(stream, topicMetricBean.name,
topicMetricBean.value, cluster, namespace,
+ topic, splitTopicAndPartitionIndexLabel,
labelsAndValues);
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index cf7310c7067..9e924bdeda3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -174,7 +174,7 @@ public class MockBucketSnapshotStorage implements
BucketSnapshotStorage {
long length = 0;
List<ByteBuf> bufList = this.bucketSnapshots.get(bucketId);
for (ByteBuf byteBuf : bufList) {
- length += byteBuf.array().length;
+ length += byteBuf.readableBytes();
}
return length;
}, executorService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 717bada7705..95234d688f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
-import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.MockManagedCursor;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -158,7 +157,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
}
@Test(dataProvider = "delayedTracker")
- public void testContainsMessage(DelayedDeliveryTracker tracker) {
+ public void testContainsMessage(BucketDelayedDeliveryTracker tracker) {
tracker.addMessage(1, 1, 10);
tracker.addMessage(2, 2, 20);
@@ -191,6 +190,12 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
clockTime.set(1 * 10);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(
+
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x ->
x.merging ||
+ !x.getSnapshotCreateFuture().get().isDone()));
+ });
+
assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduledMessages =
tracker.getScheduledMessages(100);
@@ -202,16 +207,16 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
clockTime.set(30 * 10);
- tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,50);
+ BucketDelayedDeliveryTracker tracker2 = new
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50);
- assertFalse(tracker.containsMessage(101, 101));
- assertEquals(tracker.getNumberOfDelayedMessages(), 70);
+ assertFalse(tracker2.containsMessage(101, 101));
+ assertEquals(tracker2.getNumberOfDelayedMessages(), 70);
clockTime.set(100 * 10);
- assertTrue(tracker.hasMessageAvailable());
- scheduledMessages = tracker.getScheduledMessages(70);
+ assertTrue(tracker2.hasMessageAvailable());
+ scheduledMessages = tracker2.getScheduledMessages(70);
assertEquals(scheduledMessages.size(), 70);
@@ -221,7 +226,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
i++;
}
- tracker.close();
+ tracker2.close();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 09b7cbbf1b9..5480a2e7a70 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -19,18 +19,26 @@
package org.apache.pulsar.broker.service.persistent;
import static
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
@@ -162,4 +170,139 @@ public class BucketDelayedDeliveryTest extends
DelayedDeliveryTest {
}
}
}
+
+
+ @Test
+ public void testBucketDelayedIndexMetrics() throws Exception {
+ cleanup();
+ setup();
+
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/testBucketDelayedIndexMetrics");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test_sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test_sub2")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ final int N = 101;
+
+ for (int i = 0; i < N; i++) {
+ producer.newMessage()
+ .value("msg-" + i)
+ .deliverAfter(3600 + i, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+ producer.flush();
+
+ Thread.sleep(2000);
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, true, true, output);
+ String metricsStr = output.toString(StandardCharsets.UTF_8);
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap =
PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ List<PrometheusMetricsTest.Metric> bucketsMetrics =
+
metricsMap.get("pulsar_delayed_message_index_bucket_total").stream()
+ .filter(metric ->
metric.tags.get("topic").equals(topic)).toList();
+ MutableInt bucketsSum = new MutableInt();
+ bucketsMetrics.stream().filter(metric ->
metric.tags.containsKey("subscription")).forEach(metric -> {
+ assertEquals(3, metric.value);
+ bucketsSum.add(metric.value);
+ });
+ assertEquals(6, bucketsSum.intValue());
+ Optional<PrometheusMetricsTest.Metric> bucketsTopicMetric =
+ bucketsMetrics.stream().filter(metric ->
!metric.tags.containsKey("subscription")).findFirst();
+ assertTrue(bucketsTopicMetric.isPresent());
+ assertEquals(bucketsSum.intValue(), bucketsTopicMetric.get().value);
+
+ List<PrometheusMetricsTest.Metric> loadedIndexMetrics =
+ metricsMap.get("pulsar_delayed_message_index_loaded").stream()
+ .filter(metric ->
metric.tags.get("topic").equals(topic)).toList();
+ MutableInt loadedIndexSum = new MutableInt();
+ long count = loadedIndexMetrics.stream().filter(metric ->
metric.tags.containsKey("subscription")).peek(metric -> {
+ assertTrue(metric.value > 0 && metric.value <= N);
+ loadedIndexSum.add(metric.value);
+ }).count();
+ assertEquals(2, count);
+ Optional<PrometheusMetricsTest.Metric> loadedIndexTopicMetrics =
+ bucketsMetrics.stream().filter(metric ->
!metric.tags.containsKey("subscription")).findFirst();
+ assertTrue(loadedIndexTopicMetrics.isPresent());
+ assertEquals(loadedIndexSum.intValue(),
loadedIndexTopicMetrics.get().value);
+
+ List<PrometheusMetricsTest.Metric> snapshotSizeBytesMetrics =
+
metricsMap.get("pulsar_delayed_message_index_bucket_snapshot_size_bytes").stream()
+ .filter(metric ->
metric.tags.get("topic").equals(topic)).toList();
+ MutableInt snapshotSizeBytesSum = new MutableInt();
+ count = snapshotSizeBytesMetrics.stream().filter(metric ->
metric.tags.containsKey("subscription"))
+ .peek(metric -> {
+ assertTrue(metric.value > 0);
+ snapshotSizeBytesSum.add(metric.value);
+ }).count();
+ assertEquals(2, count);
+ Optional<PrometheusMetricsTest.Metric> snapshotSizeBytesTopicMetrics =
+ snapshotSizeBytesMetrics.stream().filter(metric ->
!metric.tags.containsKey("subscription")).findFirst();
+ assertTrue(snapshotSizeBytesTopicMetrics.isPresent());
+ assertEquals(snapshotSizeBytesSum.intValue(),
snapshotSizeBytesTopicMetrics.get().value);
+
+ List<PrometheusMetricsTest.Metric> opCountMetrics =
+
metricsMap.get("pulsar_delayed_message_index_bucket_op_count").stream()
+ .filter(metric ->
metric.tags.get("topic").equals(topic)).toList();
+ MutableInt opCountMetricsSum = new MutableInt();
+ count = opCountMetrics.stream()
+ .filter(metric -> metric.tags.get("state").equals("succeed")
&& metric.tags.get("type").equals("create")
+ && metric.tags.containsKey("subscription"))
+ .peek(metric -> {
+ assertTrue(metric.value >= 2);
+ opCountMetricsSum.add(metric.value);
+ }).count();
+ assertEquals(2, count);
+ Optional<PrometheusMetricsTest.Metric> opCountTopicMetrics =
+ opCountMetrics.stream()
+ .filter(metric ->
metric.tags.get("state").equals("succeed") && metric.tags.get("type")
+ .equals("create") &&
!metric.tags.containsKey("subscription")).findFirst();
+ assertTrue(opCountTopicMetrics.isPresent());
+ assertEquals(opCountMetricsSum.intValue(),
opCountTopicMetrics.get().value);
+
+ List<PrometheusMetricsTest.Metric> opLatencyMetrics =
+
metricsMap.get("pulsar_delayed_message_index_bucket_op_latency_ms").stream()
+ .filter(metric ->
metric.tags.get("topic").equals(topic)).toList();
+ MutableInt opLatencyMetricsSum = new MutableInt();
+ count = opLatencyMetrics.stream()
+ .filter(metric -> metric.tags.get("type").equals("create")
+ && metric.tags.containsKey("subscription"))
+ .peek(metric -> {
+ assertTrue(metric.tags.containsKey("quantile"));
+ opLatencyMetricsSum.add(metric.value);
+ }).count();
+ assertTrue(count >= 2);
+ Optional<PrometheusMetricsTest.Metric> opLatencyTopicMetrics =
+ opCountMetrics.stream()
+ .filter(metric ->
metric.tags.get("type").equals("create")
+ &&
!metric.tags.containsKey("subscription")).findFirst();
+ assertTrue(opLatencyTopicMetrics.isPresent());
+ assertEquals(opLatencyMetricsSum.intValue(),
opLatencyTopicMetrics.get().value);
+
+ ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, true, true,
namespaceOutput);
+ Multimap<String, PrometheusMetricsTest.Metric> namespaceMetricsMap =
PrometheusMetricsTest.parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
+
+ Optional<PrometheusMetricsTest.Metric> namespaceMetric =
+
namespaceMetricsMap.get("pulsar_delayed_message_index_bucket_total").stream().findFirst();
+ assertTrue(namespaceMetric.isPresent());
+ assertEquals(6, namespaceMetric.get().value);
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 9c7e24ba021..25fa666523f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -134,6 +134,8 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public long delayedMessageIndexSizeInBytes;
+ public Map<String, TopicMetricBean> bucketDelayedIndexStats;
+
/** SubscriptionProperties (key/value strings) associated with this
subscribe. */
public Map<String, String> subscriptionProperties;
@@ -149,6 +151,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.consumers = new ArrayList<>();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
this.subscriptionProperties = new HashMap<>();
+ this.bucketDelayedIndexStats = new HashMap<>();
}
public void reset() {
@@ -175,6 +178,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
filterAcceptedMsgCount = 0;
filterRejectedMsgCount = 0;
filterRescheduledMsgCount = 0;
+ bucketDelayedIndexStats.clear();
}
// if the stats are added for the 1st time, we will need to make a copy of
these stats and add it to the current
@@ -215,6 +219,14 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
this.filterRejectedMsgCount += stats.filterRejectedMsgCount;
this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount;
+ stats.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new
TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
+
return this;
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
new file mode 100644
index 00000000000..e01a9d7aa71
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class TopicMetricBean {
+ public String name;
+ public double value;
+ public String[] labelsAndValues;
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 12d30124f7d..c9c4739b904 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -139,6 +139,9 @@ public class TopicStatsImpl implements TopicStats {
/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public long delayedMessageIndexSizeInBytes;
+ /** Map of bucket delayed index statistics. */
+ public Map<String, TopicMetricBean> bucketDelayedIndexStats;
+
/** The compaction stats. */
public CompactionStatsImpl compaction;
@@ -182,6 +185,7 @@ public class TopicStatsImpl implements TopicStats {
this.subscriptions = new HashMap<>();
this.replication = new TreeMap<>();
this.compaction = new CompactionStatsImpl();
+ this.bucketDelayedIndexStats = new HashMap<>();
}
public void reset() {
@@ -214,6 +218,7 @@ public class TopicStatsImpl implements TopicStats {
this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
this.ownerBroker = null;
+ this.bucketDelayedIndexStats.clear();
}
// if the stats are added for the 1st time, we will need to make a copy of
these stats and add it to the current
@@ -244,6 +249,14 @@ public class TopicStatsImpl implements TopicStats {
this.abortedTxnCount = stats.abortedTxnCount;
this.committedTxnCount = stats.committedTxnCount;
+ stats.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new
TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
+
for (int index = 0; index < stats.getPublishers().size(); index++) {
PublisherStats s = stats.getPublishers().get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {