This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 68c10eed760 [feat][broker][PIP-195] Add metrics for bucket delayed 
message tracker (#19716)
68c10eed760 is described below

commit 68c10eed7604aa3dcc3a6d8b548575e99b94dca2
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Mar 30 16:32:13 2023 +0800

    [feat][broker][PIP-195] Add metrics for bucket delayed message tracker 
(#19716)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   2 +-
 .../broker/delayed/DelayedDeliveryTracker.java     |   6 -
 .../delayed/InMemoryDelayedDeliveryTracker.java    |   5 -
 .../bucket/BookkeeperBucketSnapshotStorage.java    |  15 +--
 .../bucket/BucketDelayedDeliveryTracker.java       |  74 ++++++++---
 .../bucket/BucketDelayedMessageIndexStats.java     | 146 +++++++++++++++++++++
 .../broker/delayed/bucket/ImmutableBucket.java     |  28 +++-
 .../broker/delayed/bucket/MutableBucket.java       |   6 +-
 .../PersistentDispatcherMultipleConsumers.java     |  15 ++-
 .../service/persistent/PersistentSubscription.java |   3 +
 .../broker/service/persistent/PersistentTopic.java |   9 ++
 .../stats/prometheus/AggregatedNamespaceStats.java |  19 +++
 .../prometheus/AggregatedSubscriptionStats.java    |   3 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  13 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  22 ++++
 .../broker/delayed/MockBucketSnapshotStorage.java  |   2 +-
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  23 ++--
 .../persistent/BucketDelayedDeliveryTest.java      | 143 ++++++++++++++++++++
 .../policies/data/stats/SubscriptionStatsImpl.java |  12 ++
 .../policies/data/stats/TopicMetricBean.java       |  30 +++++
 .../common/policies/data/stats/TopicStatsImpl.java |  13 ++
 21 files changed, 532 insertions(+), 57 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 03e0fec0a53..52cebe15f6a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -366,7 +366,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 
300;
 
     @FieldContext(category = CATEGORY_SERVER, doc = """
-            The max number of delayed message index in per bucket snapshot 
segment, -1 means no limitation\
+            The max number of delayed message index in per bucket snapshot 
segment, -1 means no limitation, \
             after reaching the max number limitation, the snapshot segment 
will be cut off.""")
     private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 3cc2da8db1e..78229fef25a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -67,12 +67,6 @@ public interface DelayedDeliveryTracker extends 
AutoCloseable {
      */
     boolean shouldPauseAllDeliveries();
 
-    /**
-     * Tells whether this DelayedDeliveryTracker contains this message index,
-     * if the tracker is not supported it or disabled this feature also will 
return false.
-     */
-    boolean containsMessage(long ledgerId, long entryId);
-
     /**
      *  Reset tick time use zk policies cache.
      * @param tickTime
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 8de6ee58e2c..58358b06a46 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -178,11 +178,6 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                 && !hasMessageAvailable();
     }
 
-    @Override
-    public boolean containsMessage(long ledgerId, long entryId) {
-        return false;
-    }
-
     protected long nextDeliveryTime() {
         return priorityQueue.peekN1();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 08202bb1915..e7d4f9301dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -67,7 +67,7 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
         return openLedger(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 
0).
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
                         thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
     }
 
@@ -75,17 +75,13 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              
long lastSegmentEntryId) {
         return openLedger(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 
firstSegmentEntryId,
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 
firstSegmentEntryId,
                         
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return openLedger(bucketId).thenApply(ledgerHandle -> {
-            long length = ledgerHandle.getLength();
-            closeLedger(ledgerHandle);
-            return length;
-        });
+        return openLedger(bucketId).thenApply(LedgerHandle::getLength);
     }
 
     @Override
@@ -212,8 +208,8 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
         });
     }
 
-    CompletableFuture<Enumeration<LedgerEntry>> 
getLedgerEntryThenCloseLedger(LedgerHandle ledger,
-                                                                              
long firstEntryId, long lastEntryId) {
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle 
ledger,
+                                                               long 
firstEntryId, long lastEntryId) {
         final CompletableFuture<Enumeration<LedgerEntry>> future = new 
CompletableFuture<>();
         ledger.asyncReadEntries(firstEntryId, lastEntryId,
                 (rc, handle, entries, ctx) -> {
@@ -222,7 +218,6 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
                     } else {
                         future.complete(entries);
                     }
-                    closeLedger(handle);
                 }, null
         );
         return future;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index ef7be187cec..a34bd51af98 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -57,6 +57,7 @@ import 
org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 import org.roaringbitmap.RoaringBitmap;
@@ -69,6 +70,10 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     static final int AsyncOperationTimeoutSeconds = 60;
 
+    private static final Long INVALID_BUCKET_ID = -1L;
+
+    private static final int MAX_MERGE_NUM = 4;
+
     private final long minIndexCountPerBucket;
 
     private final long timeStepPerBucketSnapshotSegmentInMillis;
@@ -93,9 +98,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final Table<Long, Long, ImmutableBucket> 
snapshotSegmentLastIndexTable;
 
-    private static final Long INVALID_BUCKET_ID = -1L;
-
-    private static final int MAX_MERGE_NUM = 4;
+    private final BucketDelayedMessageIndexStats stats;
 
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
                                  Timer timer, long tickTimeMillis,
@@ -125,6 +128,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         this.lastMutableBucket =
                 new MutableBucket(dispatcher.getName(), 
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
                         bucketSnapshotStorage);
+        this.stats = new BucketDelayedMessageIndexStats();
         this.numberDelayedMessages = recoverBucketSnapshot();
     }
 
@@ -161,8 +165,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         }
 
         try {
-            
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+            
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, 
TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("[{}] Failed to recover delayed message index bucket 
snapshot.", dispatcher.getName(), e);
             if (e instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
             }
@@ -193,7 +198,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             ImmutableBucket immutableBucket = mapEntry.getValue();
             immutableBucketMap.remove(key);
             // delete asynchronously without waiting for completion
-            immutableBucket.asyncDeleteBucketSnapshot();
+            immutableBucket.asyncDeleteBucketSnapshot(stats);
         }
 
         MutableLong numberDelayedMessages = new MutableLong(0);
@@ -246,7 +251,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return Optional.ofNullable(immutableBuckets.get(ledgerId));
     }
 
-    private void afterCreateImmutableBucket(Pair<ImmutableBucket, 
DelayedIndex> immutableBucketDelayedIndexPair) {
+    private void afterCreateImmutableBucket(Pair<ImmutableBucket, 
DelayedIndex> immutableBucketDelayedIndexPair,
+                                            long startTime) {
         if (immutableBucketDelayedIndexPair != null) {
             ImmutableBucket immutableBucket = 
immutableBucketDelayedIndexPair.getLeft();
             immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
@@ -260,14 +266,19 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 CompletableFuture<Long> future = 
createFuture.handle((bucketId, ex) -> {
                     if (ex == null) {
                         immutableBucket.setSnapshotSegments(null);
+                        immutableBucket.asyncUpdateSnapshotLength();
                         log.info("[{}] Creat bucket snapshot finish, 
bucketKey: {}", dispatcher.getName(),
                                 immutableBucket.bucketKey());
+
+                        
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
+                                System.currentTimeMillis() - startTime);
+
                         return bucketId;
                     }
 
-                    //TODO Record create snapshot failed
-                    log.error("[{}] Failed to create bucket snapshot, 
bucketKey: {}",
-                            dispatcher.getName(), immutableBucket.bucketKey(), 
ex);
+                    log.error("[{}] Failed to create bucket snapshot, 
bucketKey: {}", dispatcher.getName(),
+                            immutableBucket.bucketKey(), ex);
+                    
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
 
                     // Put indexes back into the shared queue and downgrade to 
memory mode
                     synchronized (BucketDelayedDeliveryTracker.this) {
@@ -311,12 +322,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
                 && lastMutableBucket.size() >= minIndexCountPerBucket
                 && !lastMutableBucket.isEmpty()) {
+            long createStartTime = System.currentTimeMillis();
+            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
             Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
                     lastMutableBucket.sealBucketAndAsyncPersistent(
                             this.timeStepPerBucketSnapshotSegmentInMillis,
                             this.maxIndexesPerBucketSnapshotSegment,
                             this.sharedBucketPriorityQueue);
-            afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+            afterCreateImmutableBucket(immutableBucketDelayedIndexPair, 
createStartTime);
             lastMutableBucket.resetLastMutableBucketRange();
 
             if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
@@ -374,7 +387,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         }
 
         if (minIndex >= 0) {
-            return values.subList(minIndex, minIndex + MAX_MERGE_NUM);
+            return values.subList(minIndex, minIndex + mergeNum);
         } else if (mergeNum > 2){
             return selectMergedBuckets(values, mergeNum - 1);
         } else {
@@ -400,6 +413,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
             immutableBucket.merging = true;
         }
+
+        long mergeStartTime = System.currentTimeMillis();
+        stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
         return 
asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
             synchronized (this) {
                 for (ImmutableBucket immutableBucket : 
toBeMergeImmutableBuckets) {
@@ -409,9 +425,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             if (ex != null) {
                 log.error("[{}] Failed to merge bucket snapshot, bucketKeys: 
{}",
                         dispatcher.getName(), bucketsStr, ex);
+
+                
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
             } else {
                 log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, 
bucketNum: {}",
                         dispatcher.getName(), bucketsStr, 
immutableBuckets.asMapOfRanges().size());
+
+                
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
+                        System.currentTimeMillis() - mergeStartTime);
             }
         });
     }
@@ -436,6 +457,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     })
                     .thenAccept(combinedDelayedIndexQueue -> {
                         synchronized (BucketDelayedDeliveryTracker.this) {
+                            long createStartTime = System.currentTimeMillis();
+                            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
                             Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
                                     
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
                                             
timeStepPerBucketSnapshotSegmentInMillis,
@@ -461,12 +484,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                             }
                             
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
 
-                            
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+                            
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
 
                             
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
                                     .orElse(NULL_LONG_PROMISE).thenCompose(___ 
-> {
                                         List<CompletableFuture<Void>> 
removeFutures =
-                                                
buckets.stream().map(ImmutableBucket::asyncDeleteBucketSnapshot)
+                                                buckets.stream().map(bucket -> 
bucket.asyncDeleteBucketSnapshot(stats))
                                                         .toList();
                                         return 
FutureUtil.waitForAll(removeFutures);
                                     });
@@ -557,15 +580,17 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
                     if (bucket.currentSegmentEntryId == 
bucket.lastSegmentEntryId) {
                         
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, 
bucket.endLedgerId));
-                        bucket.asyncDeleteBucketSnapshot();
+                        bucket.asyncDeleteBucketSnapshot(stats);
                         continue;
                     }
 
+                    long loadStartTime = System.currentTimeMillis();
+                    
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
                     
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
                         if (CollectionUtils.isEmpty(indexList)) {
                             immutableBuckets.asMapOfRanges()
                                     .remove(Range.closed(bucket.startLedgerId, 
bucket.endLedgerId));
-                            bucket.asyncDeleteBucketSnapshot();
+                            bucket.asyncDeleteBucketSnapshot(stats);
                             return;
                         }
                         DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
@@ -583,9 +608,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
                             log.error("[{}] Failed to load bucket snapshot 
segment, bucketKey: {}, segmentEntryId: {}",
                                     dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1, ex);
+
+                            
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
                         } else {
                             log.info("[{}] Load next bucket snapshot segment 
finish, bucketKey: {}, segmentEntryId: {}",
                                     dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1);
+
+                            
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+                                    System.currentTimeMillis() - 
loadStartTime);
                         }
                     }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), 
TimeUnit.SECONDS);
                 } catch (Exception e) {
@@ -645,7 +675,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         Iterator<ImmutableBucket> iterator = 
immutableBuckets.asMapOfRanges().values().iterator();
         while (iterator.hasNext()) {
             ImmutableBucket bucket = iterator.next();
-            futures.add(bucket.clear());
+            futures.add(bucket.clear(stats));
             numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
             iterator.remove();
         }
@@ -661,7 +691,6 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 .orElse(false);
     }
 
-    @Override
     public boolean containsMessage(long ledgerId, long entryId) {
         if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
             return true;
@@ -670,4 +699,15 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return findImmutableBucket(ledgerId).map(bucket -> 
bucket.containsMessage(ledgerId, entryId))
                 .orElse(false);
     }
+
+    public Map<String, TopicMetricBean> genTopicMetricMap() {
+        stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
+        
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + 
this.lastMutableBucket.size());
+        MutableLong totalSnapshotLength = new MutableLong();
+        immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
+            totalSnapshotLength.add(immutableBucket.getSnapshotLength());
+        });
+        stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
+        return stats.genTopicMetricMap();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
new file mode 100644
index 00000000000..68788c359d5
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
+
+public class BucketDelayedMessageIndexStats {
+
+    private static final long[] BUCKETS = new long[]{50, 100, 500, 1000, 5000, 
30000, 60000};
+
+    enum State {
+        succeed,
+        failed,
+        all
+    }
+
+    enum Type {
+        create,
+        load,
+        delete,
+        merge
+    }
+
+    private static final String BUCKET_TOTAL_NAME = 
"pulsar_delayed_message_index_bucket_total";
+    private static final String INDEX_LOADED_NAME = 
"pulsar_delayed_message_index_loaded";
+    private static final String SNAPSHOT_SIZE_BYTES_NAME = 
"pulsar_delayed_message_index_bucket_snapshot_size_bytes";
+    private static final String OP_COUNT_NAME = 
"pulsar_delayed_message_index_bucket_op_count";
+    private static final String OP_LATENCY_NAME = 
"pulsar_delayed_message_index_bucket_op_latency_ms";
+
+    private final AtomicInteger delayedMessageIndexBucketTotal = new 
AtomicInteger();
+    private final AtomicLong delayedMessageIndexLoaded = new AtomicLong();
+    private final AtomicLong delayedMessageIndexBucketSnapshotSizeBytes = new 
AtomicLong();
+    private final Map<String, StatsBuckets> 
delayedMessageIndexBucketOpLatencyMs = new ConcurrentHashMap<>();
+    private final Map<String, LongAdder> delayedMessageIndexBucketOpCount = 
new ConcurrentHashMap<>();
+
+    public BucketDelayedMessageIndexStats() {
+    }
+
+    public Map<String, TopicMetricBean> genTopicMetricMap() {
+        Map<String, TopicMetricBean> metrics = new HashMap<>();
+
+        metrics.put(BUCKET_TOTAL_NAME,
+                new TopicMetricBean(BUCKET_TOTAL_NAME, 
delayedMessageIndexBucketTotal.get(), null));
+
+        metrics.put(INDEX_LOADED_NAME,
+                new TopicMetricBean(INDEX_LOADED_NAME, 
delayedMessageIndexLoaded.get(), null));
+
+        metrics.put(SNAPSHOT_SIZE_BYTES_NAME,
+                new TopicMetricBean(SNAPSHOT_SIZE_BYTES_NAME, 
delayedMessageIndexBucketSnapshotSizeBytes.get(), null));
+
+        delayedMessageIndexBucketOpCount.forEach((k, count) -> {
+            String[] labels = splitKey(k);
+            String[] labelsAndValues = new String[] {"state", labels[0], 
"type", labels[1]};
+            String key = OP_COUNT_NAME + joinKey(labelsAndValues);
+            metrics.put(key, new TopicMetricBean(OP_COUNT_NAME, 
count.sumThenReset(), labelsAndValues));
+        });
+
+        delayedMessageIndexBucketOpLatencyMs.forEach((typeName, statsBuckets) 
-> {
+            statsBuckets.refresh();
+            long[] buckets = statsBuckets.getBuckets();
+            for (int i = 0; i < buckets.length; i++) {
+                long count = buckets[i];
+                if (count == 0L) {
+                    continue;
+                }
+                String quantile;
+                if (i == BUCKETS.length) {
+                    quantile = "overflow";
+                } else {
+                    quantile = String.valueOf(BUCKETS[i]);
+                }
+                String[] labelsAndValues = new String[] {"type", typeName, 
"quantile", quantile};
+                String key = OP_LATENCY_NAME + joinKey(labelsAndValues);
+
+                metrics.put(key, new TopicMetricBean(OP_LATENCY_NAME, count, 
labelsAndValues));
+            }
+            String[] labelsAndValues = new String[] {"type", typeName};
+            metrics.put(OP_LATENCY_NAME + "_count" + joinKey(labelsAndValues),
+                    new TopicMetricBean(OP_LATENCY_NAME + "_count", 
statsBuckets.getCount(), labelsAndValues));
+            metrics.put(OP_LATENCY_NAME + "_sum" + joinKey(labelsAndValues),
+                    new TopicMetricBean(OP_LATENCY_NAME + "_sum", 
statsBuckets.getSum(), labelsAndValues));
+        });
+
+        return metrics;
+    }
+
+    public void recordNumOfBuckets(int numOfBuckets) {
+        delayedMessageIndexBucketTotal.set(numOfBuckets);
+    }
+
+    public void recordDelayedMessageIndexLoaded(long num) {
+        delayedMessageIndexLoaded.set(num);
+    }
+
+    public void recordBucketSnapshotSizeBytes(long sizeBytes) {
+        delayedMessageIndexBucketSnapshotSizeBytes.set(sizeBytes);
+    }
+
+    public void recordTriggerEvent(Type eventType) {
+        
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.all.name(), 
eventType.name()),
+                k -> new LongAdder()).increment();
+    }
+
+    public void recordSuccessEvent(Type eventType, long cost) {
+        
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.succeed.name(), 
eventType.name()),
+                k -> new LongAdder()).increment();
+        delayedMessageIndexBucketOpLatencyMs.computeIfAbsent(eventType.name(),
+                k -> new StatsBuckets(BUCKETS)).addValue(cost);
+    }
+
+    public void recordFailEvent(Type eventType) {
+        
delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.failed.name(), 
eventType.name()),
+                k -> new LongAdder()).increment();
+    }
+
+    public static String joinKey(String... values) {
+        return String.join("_", values);
+    }
+
+    public static String[] splitKey(String key) {
+        return key.split("_");
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 969d326e281..82e98cefa5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -131,6 +131,9 @@ class ImmutableBucket extends Bucket {
                         
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
                                 snapshotSegment.getIndexesList();
                         this.setCurrentSegmentEntryId(nextSegmentEntryId);
+                        if (isRecover) {
+                            this.asyncUpdateSnapshotLength();
+                        }
                         return indexList;
                     });
         });
@@ -175,7 +178,9 @@ class ImmutableBucket extends Bucket {
         }, BucketSnapshotPersistenceException.class, MaxRetryTimes);
     }
 
-    CompletableFuture<Void> asyncDeleteBucketSnapshot() {
+    CompletableFuture<Void> 
asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats) {
+        long deleteStartTime = System.currentTimeMillis();
+        stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
         String bucketKey = bucketKey();
         long bucketId = getAndUpdateBucketId();
         return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
@@ -184,16 +189,33 @@ class ImmutableBucket extends Bucket {
                     if (ex != null) {
                         log.error("[{}] Failed to delete bucket snapshot, 
bucketId: {}, bucketKey: {}",
                                 dispatcherName, bucketId, bucketKey, ex);
+
+                        
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
                     } else {
                         log.info("[{}] Delete bucket snapshot finish, 
bucketId: {}, bucketKey: {}",
                                  dispatcherName, bucketId, bucketKey);
+
+                        
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
+                                System.currentTimeMillis() - deleteStartTime);
                     }
         });
     }
 
-    CompletableFuture<Void> clear() {
+    CompletableFuture<Void> clear(BucketDelayedMessageIndexStats stats) {
         delayedIndexBitMap.clear();
         return 
getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).exceptionally(e -> null)
-                .thenCompose(__ -> asyncDeleteBucketSnapshot());
+                .thenCompose(__ -> asyncDeleteBucketSnapshot(stats));
+    }
+
+    protected CompletableFuture<Long> asyncUpdateSnapshotLength() {
+        long bucketId = getAndUpdateBucketId();
+        return 
bucketSnapshotStorage.getBucketSnapshotLength(bucketId).whenComplete((length, 
ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to get snapshot length, bucketId: {}, 
bucketKey: {}",
+                        dispatcherName, bucketId, bucketKey(), ex);
+            } else {
+                setSnapshotLength(length);
+            }
+        });
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f8a4ecc7a4d..e49ebe9606e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -62,8 +62,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
             final long timeStepPerBucketSnapshotSegment, final int 
maxIndexesPerBucketSnapshotSegment,
             TripleLongPriorityQueue sharedQueue, DelayedIndexQueue 
delayedIndexQueue, final long startLedgerId,
             final long endLedgerId) {
-        log.info("[{}] Creating bucket snapshot, startLedgerId: {}, 
endLedgerId: {}", dispatcherName,
-                startLedgerId, endLedgerId);
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Creating bucket snapshot, startLedgerId: {}, 
endLedgerId: {}", dispatcherName,
+                    startLedgerId, endLedgerId);
+        }
 
         if (delayedIndexQueue.isEmpty()) {
             return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index e5c9e85bac3..7ff6e72d02a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -72,6 +72,7 @@ import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -332,7 +333,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     Predicate<PositionImpl> skipCondition = null;
                     final DelayedDeliveryTracker deliveryTracker = 
delayedDeliveryTracker.get();
                     if (deliveryTracker instanceof 
BucketDelayedDeliveryTracker) {
-                        skipCondition = position -> deliveryTracker
+                        skipCondition = position -> 
((BucketDelayedDeliveryTracker) deliveryTracker)
                                 .containsMessage(position.getLedgerId(), 
position.getEntryId());
                     }
                     cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, 
bytesToRead, this, ReadType.Normal,
@@ -1180,6 +1181,18 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return 0;
     }
 
+    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+        if (delayedDeliveryTracker.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        if (delayedDeliveryTracker.get() instanceof 
BucketDelayedDeliveryTracker) {
+            return ((BucketDelayedDeliveryTracker) 
delayedDeliveryTracker.get()).genTopicMetricMap();
+        }
+
+        return Collections.emptyMap();
+    }
+
     public ManagedCursor getCursor() {
         return cursor;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index bdb3c9fc391..4ed191a9b4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1147,6 +1147,9 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
             subStats.delayedMessageIndexSizeInBytes =
                     ((PersistentDispatcherMultipleConsumers) 
dispatcher).getDelayedTrackerMemoryUsage();
+
+            subStats.bucketDelayedIndexStats =
+                    ((PersistentDispatcherMultipleConsumers) 
dispatcher).getBucketDelayedIndexStats();
         }
 
         if (Subscription.isIndividualAckMode(subType)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 82a4f531235..fa08330ff3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -153,6 +153,7 @@ import 
org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -2183,6 +2184,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             stats.nonContiguousDeletedMessagesRangesSerializedSize +=
                     subStats.nonContiguousDeletedMessagesRangesSerializedSize;
             stats.delayedMessageIndexSizeInBytes += 
subStats.delayedMessageIndexSizeInBytes;
+
+            subStats.bucketDelayedIndexStats.forEach((k, v) -> {
+                TopicMetricBean topicMetricBean =
+                        stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> 
new TopicMetricBean());
+                topicMetricBean.name = v.name;
+                topicMetricBean.labelsAndValues = v.labelsAndValues;
+                topicMetricBean.value += v.value;
+            });
         });
 
         replicators.forEach((cluster, replicator) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 0a905daa341..ea77bd69302 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.compaction.CompactionRecord;
 
 public class AggregatedNamespaceStats {
@@ -65,6 +66,8 @@ public class AggregatedNamespaceStats {
     StatsBuckets compactionLatencyBuckets = new 
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
     int delayedMessageIndexSizeInBytes;
 
+    Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
+
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -83,6 +86,14 @@ public class AggregatedNamespaceStats {
         msgOutCounter += stats.msgOutCounter;
         delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
 
+        stats.bucketDelayedIndexStats.forEach((k, v) -> {
+            TopicMetricBean topicMetricBean =
+                    bucketDelayedIndexStats.computeIfAbsent(k, __ -> new 
TopicMetricBean());
+            topicMetricBean.name = v.name;
+            topicMetricBean.labelsAndValues = v.labelsAndValues;
+            topicMetricBean.value += v.value;
+        });
+
         this.ongoingTxnCount += stats.ongoingTxnCount;
         this.abortedTxnCount += stats.abortedTxnCount;
         this.committedTxnCount += stats.committedTxnCount;
@@ -132,6 +143,13 @@ public class AggregatedNamespaceStats {
             subsStats.filterRejectedMsgCount += as.filterRejectedMsgCount;
             subsStats.filterRescheduledMsgCount += 
as.filterRescheduledMsgCount;
             subsStats.delayedMessageIndexSizeInBytes += 
as.delayedMessageIndexSizeInBytes;
+            as.bucketDelayedIndexStats.forEach((k, v) -> {
+                TopicMetricBean topicMetricBean =
+                        subsStats.bucketDelayedIndexStats.computeIfAbsent(k, 
__ -> new TopicMetricBean());
+                topicMetricBean.name = v.name;
+                topicMetricBean.labelsAndValues = v.labelsAndValues;
+                topicMetricBean.value += v.value;
+            });
             as.consumerStat.forEach((c, v) -> {
                 AggregatedConsumerStats consumerStats =
                         subsStats.consumerStat.computeIfAbsent(c, k -> new 
AggregatedConsumerStats());
@@ -172,5 +190,6 @@ public class AggregatedNamespaceStats {
         replicationStats.clear();
         subscriptionStats.clear();
         delayedMessageIndexSizeInBytes = 0;
+        bucketDelayedIndexStats.clear();
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index 383c671754d..da0324c5565 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 
 public class AggregatedSubscriptionStats {
 
@@ -75,4 +76,6 @@ public class AggregatedSubscriptionStats {
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new 
HashMap<>();
 
     long delayedMessageIndexSizeInBytes;
+
+    public Map<String, TopicMetricBean> bucketDelayedIndexStats = new 
HashMap<>();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 918aef539cf..32fb06ea3ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -155,6 +156,7 @@ public class NamespaceStatsAggregator {
         subsStats.filterRejectedMsgCount = 
subscriptionStats.filterRejectedMsgCount;
         subsStats.filterRescheduledMsgCount = 
subscriptionStats.filterRescheduledMsgCount;
         subsStats.delayedMessageIndexSizeInBytes = 
subscriptionStats.delayedMessageIndexSizeInBytes;
+        subsStats.bucketDelayedIndexStats = 
subscriptionStats.bucketDelayedIndexStats;
     }
 
     private static void getTopicStats(Topic topic, TopicStats stats, boolean 
includeConsumerMetrics,
@@ -197,6 +199,7 @@ public class NamespaceStatsAggregator {
         stats.averageMsgSize = tStatus.averageMsgSize;
         stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
         stats.delayedMessageIndexSizeInBytes = 
tStatus.delayedMessageIndexSizeInBytes;
+        stats.bucketDelayedIndexStats = tStatus.bucketDelayedIndexStats;
         stats.abortedTxnCount = tStatus.abortedTxnCount;
         stats.ongoingTxnCount = tStatus.ongoingTxnCount;
         stats.committedTxnCount = tStatus.committedTxnCount;
@@ -379,6 +382,10 @@ public class NamespaceStatsAggregator {
         writeMetric(stream, "pulsar_delayed_message_index_size_bytes", 
stats.delayedMessageIndexSizeInBytes, cluster,
                 namespace);
 
+        stats.bucketDelayedIndexStats.forEach((k, metric) -> {
+            writeMetric(stream, metric.name, metric.value, cluster, namespace, 
metric.labelsAndValues);
+        });
+
         writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
 
         stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
@@ -472,8 +479,10 @@ public class NamespaceStatsAggregator {
     }
 
     private static void writeMetric(PrometheusMetricStreams stream, String 
metricName, Number value, String cluster,
-                                    String namespace) {
-        stream.writeSample(metricName, value, "cluster", cluster, "namespace", 
namespace);
+                                    String namespace, String... 
extraLabelsAndValues) {
+        String[] labelsAndValues = new String[]{"cluster", cluster, 
"namespace", namespace};
+        String[] labels = ArrayUtils.addAll(labelsAndValues, 
extraLabelsAndValues);
+        stream.writeSample(metricName, value, labels);
     }
 
     private static void writeReplicationStat(PrometheusMetricStreams stream, 
String metricName,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index abc6979484e..3a2563a8758 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.compaction.CompactionRecord;
 import org.apache.pulsar.compaction.CompactorMXBean;
 
@@ -70,6 +71,8 @@ class TopicStats {
     StatsBuckets compactionLatencyBuckets = new 
StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
     public long delayedMessageIndexSizeInBytes;
 
+    Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
+
     public void reset() {
         subscriptionsCount = 0;
         producersCount = 0;
@@ -107,6 +110,7 @@ class TopicStats {
         compactionCompactedEntriesSize = 0;
         compactionLatencyBuckets.reset();
         delayedMessageIndexSizeInBytes = 0;
+        bucketDelayedIndexStats.clear();
     }
 
     public static void printTopicStats(PrometheusMetricStreams stream, 
TopicStats stats,
@@ -162,6 +166,11 @@ class TopicStats {
         writeMetric(stream, "pulsar_delayed_message_index_size_bytes", 
stats.delayedMessageIndexSizeInBytes,
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
 
+        for (TopicMetricBean topicMetricBean : 
stats.bucketDelayedIndexStats.values()) {
+            writeTopicMetric(stream, topicMetricBean.name, 
topicMetricBean.value, cluster, namespace,
+                    topic, splitTopicAndPartitionIndexLabel, 
topicMetricBean.labelsAndValues);
+        }
+
         long[] latencyBuckets = 
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
         writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
                 latencyBuckets[0], cluster, namespace, topic, 
splitTopicAndPartitionIndexLabel);
@@ -310,6 +319,13 @@ class TopicStats {
                     subsStats.delayedMessageIndexSizeInBytes, cluster, 
namespace, topic, sub,
                     splitTopicAndPartitionIndexLabel);
 
+            final String[] subscriptionLabel = {"subscription", sub};
+            for (TopicMetricBean topicMetricBean : 
subsStats.bucketDelayedIndexStats.values()) {
+                String[] labelsAndValues = 
ArrayUtils.addAll(subscriptionLabel, topicMetricBean.labelsAndValues);
+                writeTopicMetric(stream, topicMetricBean.name, 
topicMetricBean.value, cluster, namespace,
+                        topic, splitTopicAndPartitionIndexLabel, 
labelsAndValues);
+            }
+
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 writeConsumerMetric(stream, 
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
                         cluster, namespace, topic, sub, c, 
splitTopicAndPartitionIndexLabel);
@@ -409,6 +425,12 @@ class TopicStats {
             writeMetric(stream, "pulsar_compaction_latency_count",
                     stats.compactionLatencyBuckets.getCount(), cluster, 
namespace, topic,
                     splitTopicAndPartitionIndexLabel);
+
+            for (TopicMetricBean topicMetricBean : 
stats.bucketDelayedIndexStats.values()) {
+                String[] labelsAndValues = topicMetricBean.labelsAndValues;
+                writeTopicMetric(stream, topicMetricBean.name, 
topicMetricBean.value, cluster, namespace,
+                        topic, splitTopicAndPartitionIndexLabel, 
labelsAndValues);
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index cf7310c7067..9e924bdeda3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -174,7 +174,7 @@ public class MockBucketSnapshotStorage implements 
BucketSnapshotStorage {
             long length = 0;
             List<ByteBuf> bufList = this.bucketSnapshots.get(bucketId);
             for (ByteBuf byteBuf : bufList) {
-                length += byteBuf.array().length;
+                length += byteBuf.readableBytes();
             }
             return length;
         }, executorService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 717bada7705..95234d688f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
-import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.MockManagedCursor;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -158,7 +157,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
     }
 
     @Test(dataProvider = "delayedTracker")
-    public void testContainsMessage(DelayedDeliveryTracker tracker) {
+    public void testContainsMessage(BucketDelayedDeliveryTracker tracker) {
         tracker.addMessage(1, 1, 10);
         tracker.addMessage(2, 2, 20);
 
@@ -191,6 +190,12 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         clockTime.set(1 * 10);
 
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(
+                    
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> 
x.merging ||
+                            !x.getSnapshotCreateFuture().get().isDone()));
+        });
+
         assertTrue(tracker.hasMessageAvailable());
         Set<PositionImpl> scheduledMessages = 
tracker.getScheduledMessages(100);
 
@@ -202,16 +207,16 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         clockTime.set(30 * 10);
 
-        tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,50);
+        BucketDelayedDeliveryTracker tracker2 = new 
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
+                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50);
 
-        assertFalse(tracker.containsMessage(101, 101));
-        assertEquals(tracker.getNumberOfDelayedMessages(), 70);
+        assertFalse(tracker2.containsMessage(101, 101));
+        assertEquals(tracker2.getNumberOfDelayedMessages(), 70);
 
         clockTime.set(100 * 10);
 
-        assertTrue(tracker.hasMessageAvailable());
-        scheduledMessages = tracker.getScheduledMessages(70);
+        assertTrue(tracker2.hasMessageAvailable());
+        scheduledMessages = tracker2.getScheduledMessages(70);
 
         assertEquals(scheduledMessages.size(), 70);
 
@@ -221,7 +226,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
             i++;
         }
 
-        tracker.close();
+        tracker2.close();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 09b7cbbf1b9..5480a2e7a70 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -19,18 +19,26 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
@@ -162,4 +170,139 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
             }
         }
     }
+
+
+    @Test
+    public void testBucketDelayedIndexMetrics() throws Exception {
+        cleanup();
+        setup();
+
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/testBucketDelayedIndexMetrics");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test_sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test_sub2")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        final int N = 101;
+
+        for (int i = 0; i < N; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .deliverAfter(3600 + i, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+        producer.flush();
+
+        Thread.sleep(2000);
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, true, true, output);
+        String metricsStr = output.toString(StandardCharsets.UTF_8);
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        List<PrometheusMetricsTest.Metric> bucketsMetrics =
+                
metricsMap.get("pulsar_delayed_message_index_bucket_total").stream()
+                        .filter(metric -> 
metric.tags.get("topic").equals(topic)).toList();
+        MutableInt bucketsSum = new MutableInt();
+        bucketsMetrics.stream().filter(metric -> 
metric.tags.containsKey("subscription")).forEach(metric -> {
+            assertEquals(3, metric.value);
+            bucketsSum.add(metric.value);
+        });
+        assertEquals(6, bucketsSum.intValue());
+        Optional<PrometheusMetricsTest.Metric> bucketsTopicMetric =
+                bucketsMetrics.stream().filter(metric -> 
!metric.tags.containsKey("subscription")).findFirst();
+        assertTrue(bucketsTopicMetric.isPresent());
+        assertEquals(bucketsSum.intValue(), bucketsTopicMetric.get().value);
+
+        List<PrometheusMetricsTest.Metric> loadedIndexMetrics =
+                metricsMap.get("pulsar_delayed_message_index_loaded").stream()
+                        .filter(metric -> 
metric.tags.get("topic").equals(topic)).toList();
+        MutableInt loadedIndexSum = new MutableInt();
+        long count = loadedIndexMetrics.stream().filter(metric -> 
metric.tags.containsKey("subscription")).peek(metric -> {
+            assertTrue(metric.value > 0 && metric.value <= N);
+            loadedIndexSum.add(metric.value);
+        }).count();
+        assertEquals(2, count);
+        Optional<PrometheusMetricsTest.Metric> loadedIndexTopicMetrics =
+                bucketsMetrics.stream().filter(metric -> 
!metric.tags.containsKey("subscription")).findFirst();
+        assertTrue(loadedIndexTopicMetrics.isPresent());
+        assertEquals(loadedIndexSum.intValue(), 
loadedIndexTopicMetrics.get().value);
+
+        List<PrometheusMetricsTest.Metric> snapshotSizeBytesMetrics =
+                
metricsMap.get("pulsar_delayed_message_index_bucket_snapshot_size_bytes").stream()
+                        .filter(metric -> 
metric.tags.get("topic").equals(topic)).toList();
+        MutableInt snapshotSizeBytesSum = new MutableInt();
+        count = snapshotSizeBytesMetrics.stream().filter(metric -> 
metric.tags.containsKey("subscription"))
+                .peek(metric -> {
+                    assertTrue(metric.value > 0);
+                    snapshotSizeBytesSum.add(metric.value);
+                }).count();
+        assertEquals(2, count);
+        Optional<PrometheusMetricsTest.Metric> snapshotSizeBytesTopicMetrics =
+                snapshotSizeBytesMetrics.stream().filter(metric -> 
!metric.tags.containsKey("subscription")).findFirst();
+        assertTrue(snapshotSizeBytesTopicMetrics.isPresent());
+        assertEquals(snapshotSizeBytesSum.intValue(), 
snapshotSizeBytesTopicMetrics.get().value);
+
+        List<PrometheusMetricsTest.Metric> opCountMetrics =
+                
metricsMap.get("pulsar_delayed_message_index_bucket_op_count").stream()
+                        .filter(metric -> 
metric.tags.get("topic").equals(topic)).toList();
+        MutableInt opCountMetricsSum = new MutableInt();
+        count = opCountMetrics.stream()
+                .filter(metric -> metric.tags.get("state").equals("succeed") 
&& metric.tags.get("type").equals("create")
+                        && metric.tags.containsKey("subscription"))
+                .peek(metric -> {
+                    assertTrue(metric.value >= 2);
+                    opCountMetricsSum.add(metric.value);
+                }).count();
+        assertEquals(2, count);
+        Optional<PrometheusMetricsTest.Metric> opCountTopicMetrics =
+                opCountMetrics.stream()
+                        .filter(metric -> 
metric.tags.get("state").equals("succeed") && metric.tags.get("type")
+                                .equals("create") && 
!metric.tags.containsKey("subscription")).findFirst();
+        assertTrue(opCountTopicMetrics.isPresent());
+        assertEquals(opCountMetricsSum.intValue(), 
opCountTopicMetrics.get().value);
+
+        List<PrometheusMetricsTest.Metric> opLatencyMetrics =
+                
metricsMap.get("pulsar_delayed_message_index_bucket_op_latency_ms").stream()
+                        .filter(metric -> 
metric.tags.get("topic").equals(topic)).toList();
+        MutableInt opLatencyMetricsSum = new MutableInt();
+        count = opLatencyMetrics.stream()
+                .filter(metric -> metric.tags.get("type").equals("create")
+                        && metric.tags.containsKey("subscription"))
+                .peek(metric -> {
+                    assertTrue(metric.tags.containsKey("quantile"));
+                    opLatencyMetricsSum.add(metric.value);
+                }).count();
+        assertTrue(count >= 2);
+        Optional<PrometheusMetricsTest.Metric> opLatencyTopicMetrics =
+                opCountMetrics.stream()
+                        .filter(metric -> 
metric.tags.get("type").equals("create")
+                                && 
!metric.tags.containsKey("subscription")).findFirst();
+        assertTrue(opLatencyTopicMetrics.isPresent());
+        assertEquals(opLatencyMetricsSum.intValue(), 
opLatencyTopicMetrics.get().value);
+
+        ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, true, true, 
namespaceOutput);
+        Multimap<String, PrometheusMetricsTest.Metric> namespaceMetricsMap = 
PrometheusMetricsTest.parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
+
+        Optional<PrometheusMetricsTest.Metric> namespaceMetric =
+                
namespaceMetricsMap.get("pulsar_delayed_message_index_bucket_total").stream().findFirst();
+        assertTrue(namespaceMetric.isPresent());
+        assertEquals(6, namespaceMetric.get().value);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 9c7e24ba021..25fa666523f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -134,6 +134,8 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** The size of InMemoryDelayedDeliveryTracer memory usage. */
     public long delayedMessageIndexSizeInBytes;
 
+    public Map<String, TopicMetricBean> bucketDelayedIndexStats;
+
     /** SubscriptionProperties (key/value strings) associated with this 
subscribe. */
     public Map<String, String> subscriptionProperties;
 
@@ -149,6 +151,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         this.consumers = new ArrayList<>();
         this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
         this.subscriptionProperties = new HashMap<>();
+        this.bucketDelayedIndexStats = new HashMap<>();
     }
 
     public void reset() {
@@ -175,6 +178,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         filterAcceptedMsgCount = 0;
         filterRejectedMsgCount = 0;
         filterRescheduledMsgCount = 0;
+        bucketDelayedIndexStats.clear();
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
@@ -215,6 +219,14 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
         this.filterRejectedMsgCount += stats.filterRejectedMsgCount;
         this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount;
+        stats.bucketDelayedIndexStats.forEach((k, v) -> {
+            TopicMetricBean topicMetricBean =
+                    this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new 
TopicMetricBean());
+            topicMetricBean.name = v.name;
+            topicMetricBean.labelsAndValues = v.labelsAndValues;
+            topicMetricBean.value += v.value;
+        });
+
         return this;
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
new file mode 100644
index 00000000000..e01a9d7aa71
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicMetricBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class TopicMetricBean {
+    public String name;
+    public double value;
+    public String[] labelsAndValues;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 12d30124f7d..c9c4739b904 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -139,6 +139,9 @@ public class TopicStatsImpl implements TopicStats {
     /** The size of InMemoryDelayedDeliveryTracer memory usage. */
     public long delayedMessageIndexSizeInBytes;
 
+    /** Map of bucket delayed index statistics. */
+    public Map<String, TopicMetricBean> bucketDelayedIndexStats;
+
     /** The compaction stats. */
     public CompactionStatsImpl compaction;
 
@@ -182,6 +185,7 @@ public class TopicStatsImpl implements TopicStats {
         this.subscriptions = new HashMap<>();
         this.replication = new TreeMap<>();
         this.compaction = new CompactionStatsImpl();
+        this.bucketDelayedIndexStats = new HashMap<>();
     }
 
     public void reset() {
@@ -214,6 +218,7 @@ public class TopicStatsImpl implements TopicStats {
         this.delayedMessageIndexSizeInBytes = 0;
         this.compaction.reset();
         this.ownerBroker = null;
+        this.bucketDelayedIndexStats.clear();
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
@@ -244,6 +249,14 @@ public class TopicStatsImpl implements TopicStats {
         this.abortedTxnCount = stats.abortedTxnCount;
         this.committedTxnCount = stats.committedTxnCount;
 
+        stats.bucketDelayedIndexStats.forEach((k, v) -> {
+            TopicMetricBean topicMetricBean =
+                    this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new 
TopicMetricBean());
+            topicMetricBean.name = v.name;
+            topicMetricBean.labelsAndValues = v.labelsAndValues;
+            topicMetricBean.value += v.value;
+        });
+
         for (int index = 0; index < stats.getPublishers().size(); index++) {
            PublisherStats s = stats.getPublishers().get(index);
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {

Reply via email to