This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f4aaea84d [feat][broker][PIP-195] Implement delayed message index 
bucket snapshot (create/load) - part2 (#17611)
e6f4aaea84d is described below

commit e6f4aaea84d027cea3016683815cc4e1a890cf88
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Nov 5 09:50:19 2022 +0800

    [feat][broker][PIP-195] Implement delayed message index bucket snapshot 
(create/load) - part2 (#17611)
---
 .../apache/bookkeeper/mledger/util/Futures.java    |   7 +-
 .../mledger/impl/ManagedCursorPropertiesTest.java  |   6 +-
 .../delayed/AbstractDelayedDeliveryTracker.java    | 162 ++++++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 161 ++------
 .../pulsar/broker/delayed/bucket/Bucket.java       | 145 ++++++++
 .../bucket/BucketDelayedDeliveryTracker.java       | 324 ++++++++++++++++
 .../{ => bucket}/BucketSnapshotStorage.java        |   2 +-
 .../broker/delayed/bucket/ImmutableBucket.java     | 101 +++++
 .../broker/delayed/bucket/MutableBucket.java       | 203 ++++++++++
 .../broker/delayed/bucket/package-info.java}       |  27 +-
 .../PersistentDispatcherMultipleConsumers.java     |   4 +
 .../DelayedMessageIndexBucketSnapshotFormat.proto  |   4 +-
 .../delayed/AbstractDeliveryTrackerTest.java       | 240 ++++++++++++
 .../delayed/BucketDelayedDeliveryTrackerTest.java  | 159 ++++++++
 .../delayed/InMemoryDeliveryTrackerTest.java       | 412 +++++----------------
 .../broker/delayed/MockBucketSnapshotStorage.java  | 160 ++++++++
 .../pulsar/broker/delayed/MockManagedCursor.java   | 412 +++++++++++++++++++++
 17 files changed, 2039 insertions(+), 490 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 637de8a79fc..dc1d1eb6c9a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -71,14 +71,15 @@ public class Futures {
     }
 
     public static <T> CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op,
-                                                            Class<? extends 
Exception> needRetryExceptionClass) {
+                                                            Class<? extends 
Exception> needRetryExceptionClass,
+                                                            int maxRetryTimes) 
{
         CompletableFuture<T> resultFuture = new CompletableFuture<>();
         op.get().whenComplete((res, ex) -> {
             if (ex == null) {
                 resultFuture.complete(res);
             } else {
-                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
-                    executeWithRetry(op, 
needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass()) && 
maxRetryTimes > 0) {
+                    executeWithRetry(op, needRetryExceptionClass, 
maxRetryTimes - 1).whenComplete((res2, ex2) -> {
                         if (ex2 == null) {
                             resultFuture.complete(res2);
                         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index f0d40835656..2ef8f3eae32 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -247,13 +247,13 @@ public class ManagedCursorPropertiesTest extends 
MockedBookKeeperTestCase {
         map.put("c", "3");
 
         futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
-                ManagedLedgerException.BadVersionException.class));
+                ManagedLedgerException.BadVersionException.class, 3));
 
         futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
-                ManagedLedgerException.BadVersionException.class));
+                ManagedLedgerException.BadVersionException.class, 3));
 
         futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
-                ManagedLedgerException.BadVersionException.class));
+                ManagedLedgerException.BadVersionException.class, 3));
 
         for (CompletableFuture<Void> future : futures) {
             future.get();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
new file mode 100644
index 00000000000..5c99e4c307d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+
+@Slf4j
+public abstract class AbstractDelayedDeliveryTracker implements 
DelayedDeliveryTracker, TimerTask {
+
+    protected final PersistentDispatcherMultipleConsumers dispatcher;
+
+    // Reference to the shared (per-broker) timer for delayed delivery
+    protected final Timer timer;
+
+    // Current timeout or null if not set
+    protected Timeout timeout;
+
+    // Timestamp at which the timeout is currently set
+    private long currentTimeoutTarget;
+
+    // Last time the TimerTask was triggered for this class
+    private long lastTickRun;
+
+    protected long tickTimeMillis;
+
+    protected final Clock clock;
+
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+    public 
AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
+                                          long tickTimeMillis,
+                                          boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
+    }
+
+    public 
AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
+                                          long tickTimeMillis, Clock clock,
+                                          boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+        this.dispatcher = dispatcher;
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+    }
+
+
+    /**
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow 
for early delivery by as much as the
+     * {@link #tickTimeMillis} because it is a slight optimization to let 
messages skip going back into the delay
+     * tracker for a brief amount of time when we're already trying to 
dispatch to the consumer.
+     *
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the 
current time to determine when messages
+     * can be delivered. As a consequence, there are two delays that will 
affect delivery. The first is the
+     * {@link #tickTimeMillis} and the second is the {@link Timer}'s 
granularity.
+     *
+     * @return the cutoff time to determine whether a message is ready to 
deliver to the consumer
+     */
+    protected long getCutoffTime() {
+        return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : 
clock.millis() + tickTimeMillis;
+    }
+
+    public void resetTickTime(long tickTime) {
+        if (this.tickTimeMillis != tickTime) {
+            this.tickTimeMillis = tickTime;
+        }
+    }
+
+    protected void updateTimer() {
+        if (getNumberOfDelayedMessages() == 0) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        long timestamp = nextDeliveryTime();
+        if (timestamp == currentTimeoutTarget) {
+            // The timer is already set to the correct target time
+            return;
+        }
+
+        if (timeout != null) {
+            timeout.cancel();
+        }
+
+        long now = clock.millis();
+        long delayMillis = timestamp - now;
+
+        if (delayMillis < 0) {
+            // There are messages that are already ready to be delivered. If
+            // the dispatcher is not getting them is because the consumer is
+            // either not connected or slow.
+            // We don't need to keep retriggering the timer. When the consumer
+            // catches up, the dispatcher will do the readMoreEntries() and
+            // get these messages
+            return;
+        }
+
+        // Compute the earliest time that we schedule the timer to run.
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), 
calculatedDelayMillis);
+        }
+
+        // Even though we may delay longer than this timestamp because of the 
tick delay, we still track the
+        // current timeout with reference to the next message's timestamp.
+        currentTimeoutTarget = timestamp;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Timer triggered", dispatcher.getName());
+        }
+        if (timeout == null || timeout.isCancelled()) {
+            return;
+        }
+
+        synchronized (dispatcher) {
+            lastTickRun = clock.millis();
+            currentTimeoutTarget = -1;
+            this.timeout = null;
+            dispatcher.readMoreEntries();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (timeout != null) {
+            timeout.cancel();
+            timeout = null;
+        }
+    }
+
+    protected abstract long nextDeliveryTime();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index da28ff19234..f55d5fd1169 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -18,47 +18,28 @@
  */
 package org.apache.pulsar.broker.delayed;
 
-import io.netty.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timer;
-import io.netty.util.TimerTask;
 import java.time.Clock;
 import java.util.NavigableSet;
 import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 
 @Slf4j
-public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, 
TimerTask {
+public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker {
 
     protected final TripleLongPriorityQueue priorityQueue = new 
TripleLongPriorityQueue();
 
-    private final PersistentDispatcherMultipleConsumers dispatcher;
-
-    // Reference to the shared (per-broker) timer for delayed delivery
-    private final Timer timer;
-
-    // Current timeout or null if not set
-    protected Timeout timeout;
-
-    // Timestamp at which the timeout is currently set
-    private long currentTimeoutTarget;
-
-    // Last time the TimerTask was triggered for this class
-    private long lastTickRun;
-
-    private long tickTimeMillis;
-
-    private final Clock clock;
-
-    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
-
     // If we detect that all messages have fixed delay time, such that the 
delivery is
     // always going to be in FIFO order, then we can avoid pulling all the 
messages in
     // tracker. Instead, we use the lookahead for detection and pause the read 
from
     // the cursor if the delays are fixed.
+    @Getter
+    @VisibleForTesting
     private final long fixedDelayDetectionLookahead;
 
     // This is the timestamp of the message with the highest delivery time
@@ -76,33 +57,14 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
                 fixedDelayDetectionLookahead);
     }
 
-    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
+    public 
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                    long tickTimeMillis, Clock clock,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict,
                                    long fixedDelayDetectionLookahead) {
-        this.dispatcher = dispatcher;
-        this.timer = timer;
-        this.tickTimeMillis = tickTimeMillis;
-        this.clock = clock;
-        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
     }
 
-    /**
-     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow 
for early delivery by as much as the
-     * {@link #tickTimeMillis} because it is a slight optimization to let 
messages skip going back into the delay
-     * tracker for a brief amount of time when we're already trying to 
dispatch to the consumer.
-     *
-     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the 
current time to determine when messages
-     * can be delivered. As a consequence, there are two delays that will 
affect delivery. The first is the
-     * {@link #tickTimeMillis} and the second is the {@link Timer}'s 
granularity.
-     *
-     * @return the cutoff time to determine whether a message is ready to 
deliver to the consumer
-     */
-    private long getCutoffTime() {
-        return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : 
clock.millis() + tickTimeMillis;
-    }
-
     @Override
     public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
         if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
@@ -115,19 +77,24 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
                     deliverAt - clock.millis());
         }
 
-
         priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
 
-        // Check that new delivery time comes after the current highest, or at
-        // least within a single tick time interval of 1 second.
+        checkAndUpdateHighest(deliverAt);
+
+        return true;
+    }
+
+    /**
+     * Check that new delivery time comes after the current highest, or at
+     * least within a single tick time interval of 1 second.
+     */
+    private void checkAndUpdateHighest(long deliverAt) {
         if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
             messagesHaveFixedDelay = false;
         }
 
         highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, 
deliverAt);
-
-        return true;
     }
 
     /**
@@ -179,14 +146,6 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         return positions;
     }
 
-    @Override
-    public void resetTickTime(long tickTime) {
-
-        if (this.tickTimeMillis != tickTime) {
-            this.tickTimeMillis = tickTime;
-        }
-    }
-
     @Override
     public void clear() {
         this.priorityQueue.clear();
@@ -202,87 +161,9 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         return priorityQueue.bytesCapacity();
     }
 
-    /**
-     * Update the scheduled timer task such that:
-     * 1. If there are no delayed messages, return and do not schedule a timer 
task.
-     * 2. If the next message in the queue has the same deliverAt time as the 
timer task, return and leave existing
-     *    timer task in place.
-     * 3. If the deliverAt time for the next delayed message has already 
passed (i.e. the delay is negative), return
-     *    without scheduling a timer task since the subscription is backlogged.
-     * 4. Else, schedule a timer task where the delay is the greater of these 
two: the next message's deliverAt time or
-     *    the last tick time plus the tickTimeMillis (to ensure we do not 
schedule the task more frequently than the
-     *    tickTimeMillis).
-     */
-    private void updateTimer() {
-        if (priorityQueue.isEmpty()) {
-            if (timeout != null) {
-                currentTimeoutTarget = -1;
-                timeout.cancel();
-                timeout = null;
-            }
-            return;
-        }
-
-        long timestamp = priorityQueue.peekN1();
-        if (timestamp == currentTimeoutTarget) {
-            // The timer is already set to the correct target time
-            return;
-        }
-
-        if (timeout != null) {
-            timeout.cancel();
-        }
-
-        long now = clock.millis();
-        long delayMillis = timestamp - now;
-
-        if (delayMillis < 0) {
-            // There are messages that are already ready to be delivered. If
-            // the dispatcher is not getting them is because the consumer is
-            // either not connected or slow.
-            // We don't need to keep retriggering the timer. When the consumer
-            // catches up, the dispatcher will do the readMoreEntries() and
-            // get these messages
-            return;
-        }
-
-        // Compute the earliest time that we schedule the timer to run.
-        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
-        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
-
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), 
calculatedDelayMillis);
-        }
-
-        // Even though we may delay longer than this timestamp because of the 
tick delay, we still track the
-        // current timeout with reference to the next message's timestamp.
-        currentTimeoutTarget = timestamp;
-        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void run(Timeout timeout) throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Timer triggered", dispatcher.getName());
-        }
-        if (timeout == null || timeout.isCancelled()) {
-            return;
-        }
-
-        synchronized (dispatcher) {
-            lastTickRun = clock.millis();
-            currentTimeoutTarget = -1;
-            this.timeout = null;
-            dispatcher.readMoreEntries();
-        }
-    }
-
     @Override
     public void close() {
-        if (timeout != null) {
-            timeout.cancel();
-            timeout = null;
-        }
+        super.close();
         priorityQueue.close();
     }
 
@@ -291,7 +172,7 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         // Pause deliveries if we know all delays are fixed within the 
lookahead window
         return fixedDelayDetectionLookahead > 0
                 && messagesHaveFixedDelay
-                && priorityQueue.size() >= fixedDelayDetectionLookahead
+                && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
                 && !hasMessageAvailable();
     }
 
@@ -299,4 +180,8 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     public boolean containsMessage(long ledgerId, long entryId) {
         return false;
     }
+
+    protected long nextDeliveryTime() {
+        return priorityQueue.peekN1();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
new file mode 100644
index 00000000000..fbd6d765705
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.roaringbitmap.RoaringBitmap;
+
+@Slf4j
+@Data
+@AllArgsConstructor
+abstract class Bucket {
+
+    static final String DELAYED_BUCKET_KEY_PREFIX = 
"#pulsar.internal.delayed.bucket";
+    static final String DELIMITER = "_";
+    static final int MaxRetryTimes = 3;
+
+    protected final ManagedCursor cursor;
+    protected final BucketSnapshotStorage bucketSnapshotStorage;
+
+    long startLedgerId;
+    long endLedgerId;
+
+    Map<Long, RoaringBitmap> delayedIndexBitMap;
+
+    long numberBucketDelayedMessages;
+
+    int lastSegmentEntryId;
+
+    int currentSegmentEntryId;
+
+    long snapshotLength;
+
+    private volatile Long bucketId;
+
+    private volatile CompletableFuture<Long> snapshotCreateFuture;
+
+
+    Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long 
startLedgerId, long endLedgerId) {
+        this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, 
-1, 0, 0, null, null);
+    }
+
+    boolean containsMessage(long ledgerId, long entryId) {
+        RoaringBitmap bitSet = delayedIndexBitMap.get(ledgerId);
+        if (bitSet == null) {
+            return false;
+        }
+        return bitSet.contains(entryId, entryId + 1);
+    }
+
+    void putIndexBit(long ledgerId, long entryId) {
+        delayedIndexBitMap.computeIfAbsent(ledgerId, k -> new 
RoaringBitmap()).add(entryId, entryId + 1);
+    }
+
+    boolean removeIndexBit(long ledgerId, long entryId) {
+        boolean contained = false;
+        RoaringBitmap bitSet = delayedIndexBitMap.get(ledgerId);
+        if (bitSet != null && bitSet.contains(entryId, entryId + 1)) {
+            contained = true;
+            bitSet.remove(entryId, entryId + 1);
+
+            if (bitSet.isEmpty()) {
+                delayedIndexBitMap.remove(ledgerId);
+            }
+
+            if (numberBucketDelayedMessages > 0) {
+                numberBucketDelayedMessages--;
+            }
+        }
+        return contained;
+    }
+
+    String bucketKey() {
+        return String.join(DELIMITER, DELAYED_BUCKET_KEY_PREFIX, 
String.valueOf(startLedgerId),
+                String.valueOf(endLedgerId));
+    }
+
+    Optional<CompletableFuture<Long>> getSnapshotCreateFuture() {
+        return Optional.ofNullable(snapshotCreateFuture);
+    }
+
+    Optional<Long> getBucketId() {
+        return Optional.ofNullable(bucketId);
+    }
+
+    long getAndUpdateBucketId() {
+        Optional<Long> bucketIdOptional = getBucketId();
+        if (bucketIdOptional.isPresent()) {
+            return bucketIdOptional.get();
+        }
+
+        String bucketIdStr = cursor.getCursorProperties().get(bucketKey());
+        long bucketId = Long.parseLong(bucketIdStr);
+        setBucketId(bucketId);
+        return bucketId;
+    }
+
+    CompletableFuture<Long> asyncSaveBucketSnapshot(
+            ImmutableBucket bucketState, 
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
+            List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments) {
+
+        return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, 
bucketSnapshotSegments)
+                .thenCompose(newBucketId -> {
+                    bucketState.setBucketId(newBucketId);
+                    String bucketKey = bucketState.bucketKey();
+                    return putBucketKeyId(bucketKey, 
newBucketId).exceptionally(ex -> {
+                        log.warn("Failed to record bucketId to cursor 
property, bucketKey: {}", bucketKey);
+                        return null;
+                    }).thenApply(__ -> newBucketId);
+                });
+    }
+
+    private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long 
bucketId) {
+        Objects.requireNonNull(bucketId);
+        return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, 
String.valueOf(bucketId)),
+                ManagedLedgerException.BadVersionException.class, 
MaxRetryTimes);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
new file mode 100644
index 00000000000..b7f0e0a1bc1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeRangeMap;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import java.time.Clock;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+
+@Slf4j
+@ThreadSafe
+public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker {
+
+    static final int AsyncOperationTimeoutSeconds = 30;
+
+    private final long minIndexCountPerBucket;
+
+    private final long timeStepPerBucketSnapshotSegment;
+
+    private final int maxNumBuckets;
+
+    private long numberDelayedMessages;
+
+    private final MutableBucket lastMutableBucket;
+
+    private final TripleLongPriorityQueue sharedBucketPriorityQueue;
+
+    private final RangeMap<Long, ImmutableBucket> immutableBuckets;
+
+    private final Table<Long, Long, ImmutableBucket> 
snapshotSegmentLastIndexTable;
+
+    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
+                                 Timer timer, long tickTimeMillis,
+                                 boolean isDelayedDeliveryDeliverAtTimeStrict,
+                                 BucketSnapshotStorage bucketSnapshotStorage,
+                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegment,
+                                 int maxNumBuckets) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegment, maxNumBuckets);
+    }
+
+    public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
+                                 Timer timer, long tickTimeMillis, Clock clock,
+                                 boolean isDelayedDeliveryDeliverAtTimeStrict,
+                                 BucketSnapshotStorage bucketSnapshotStorage,
+                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegment,
+                                 int maxNumBuckets) {
+        super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
+        this.minIndexCountPerBucket = minIndexCountPerBucket;
+        this.timeStepPerBucketSnapshotSegment = 
timeStepPerBucketSnapshotSegment;
+        this.maxNumBuckets = maxNumBuckets;
+        this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
+        this.immutableBuckets = TreeRangeMap.create();
+        this.snapshotSegmentLastIndexTable = HashBasedTable.create();
+        this.numberDelayedMessages = 0L;
+        ManagedCursor cursor = dispatcher.getCursor();
+        this.lastMutableBucket = new MutableBucket(cursor, 
bucketSnapshotStorage);
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        synchronized (this) {
+            if (timeout == null || timeout.isCancelled()) {
+                return;
+            }
+            
lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), 
sharedBucketPriorityQueue);
+        }
+        super.run(timeout);
+    }
+
+    private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
+        if (immutableBuckets.asMapOfRanges().isEmpty()) {
+            return Optional.empty();
+        }
+
+        return Optional.ofNullable(immutableBuckets.get(ledgerId));
+    }
+
+    private void sealBucket() {
+        Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
+                
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+                        this.sharedBucketPriorityQueue);
+        if (immutableBucketDelayedIndexPair != null) {
+            ImmutableBucket immutableBucket = 
immutableBucketDelayedIndexPair.getLeft();
+            immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
+                    immutableBucket);
+
+            DelayedIndex lastDelayedIndex = 
immutableBucketDelayedIndexPair.getRight();
+            snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), 
lastDelayedIndex.getEntryId(),
+                    immutableBucket);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Create bucket snapshot, bucket: {}", 
dispatcher.getName(),
+                        lastMutableBucket);
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean addMessage(long ledgerId, long entryId, long 
deliverAt) {
+        if (containsMessage(ledgerId, entryId)) {
+            return true;
+        }
+
+        if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+            return false;
+        }
+
+        boolean existBucket = findImmutableBucket(ledgerId).isPresent();
+
+        // Create bucket snapshot
+        if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
+                && lastMutableBucket.size() >= minIndexCountPerBucket
+                && !lastMutableBucket.isEmpty()) {
+            sealBucket();
+            lastMutableBucket.resetLastMutableBucketRange();
+
+            if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+                // TODO merge bucket snapshot (synchronize operate)
+            }
+        }
+
+        if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
+            // If (ledgerId < startLedgerId || existBucket) means that message 
index belong to previous bucket range,
+            // enter sharedBucketPriorityQueue directly
+            sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+        } else {
+            checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
+            lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
+        }
+
+        numberDelayedMessages++;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
+                    deliverAt - clock.millis());
+        }
+
+        updateTimer();
+
+        return true;
+    }
+
+    @Override
+    public synchronized boolean hasMessageAvailable() {
+        long cutoffTime = getCutoffTime();
+
+        boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && 
nextDeliveryTime() <= cutoffTime;
+        if (!hasMessageAvailable) {
+            updateTimer();
+        }
+        return hasMessageAvailable;
+    }
+
+    @Override
+    protected long nextDeliveryTime() {
+        if (lastMutableBucket.isEmpty() && 
!sharedBucketPriorityQueue.isEmpty()) {
+            return sharedBucketPriorityQueue.peekN1();
+        } else if (sharedBucketPriorityQueue.isEmpty() && 
!lastMutableBucket.isEmpty()) {
+            return lastMutableBucket.nextDeliveryTime();
+        }
+        long timestamp = lastMutableBucket.nextDeliveryTime();
+        long bucketTimestamp = sharedBucketPriorityQueue.peekN1();
+        return Math.min(timestamp, bucketTimestamp);
+    }
+
+    @Override
+    public synchronized long getNumberOfDelayedMessages() {
+        return numberDelayedMessages;
+    }
+
+    @Override
+    public synchronized long getBufferMemoryUsage() {
+        return this.lastMutableBucket.getBufferMemoryUsage() + 
sharedBucketPriorityQueue.bytesCapacity();
+    }
+
+    @Override
+    public synchronized NavigableSet<PositionImpl> getScheduledMessages(int 
maxMessages) {
+        long cutoffTime = getCutoffTime();
+
+        lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, 
sharedBucketPriorityQueue);
+
+        NavigableSet<PositionImpl> positions = new TreeSet<>();
+        int n = maxMessages;
+
+        while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) {
+            long timestamp = sharedBucketPriorityQueue.peekN1();
+            if (timestamp > cutoffTime) {
+                break;
+            }
+
+            long ledgerId = sharedBucketPriorityQueue.peekN2();
+            long entryId = sharedBucketPriorityQueue.peekN3();
+            positions.add(new PositionImpl(ledgerId, entryId));
+
+            sharedBucketPriorityQueue.pop();
+            removeIndexBit(ledgerId, entryId);
+
+            ImmutableBucket bucket = 
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
+            if (bucket != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Load next snapshot segment, bucket: {}", 
dispatcher.getName(), bucket);
+                }
+                // All message of current snapshot segment are scheduled, load 
next snapshot segment
+                // TODO make it asynchronous and not blocking this process
+                try {
+                    
bucket.asyncLoadNextBucketSnapshotEntry(false).thenAccept(indexList -> {
+                        if (CollectionUtils.isEmpty(indexList)) {
+                            return;
+                        }
+                        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
+                                lastDelayedIndex = 
indexList.get(indexList.size() - 1);
+                        
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
+                                lastDelayedIndex.getEntryId(), bucket);
+                        for 
(DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
+                            
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
+                                    index.getEntryId());
+                        }
+                    }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+                } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                    // TODO make this segment load again
+                    throw new RuntimeException(e);
+                }
+            }
+
+            --n;
+            --numberDelayedMessages;
+        }
+
+        updateTimer();
+
+        return positions;
+    }
+
+    @Override
+    public boolean shouldPauseAllDeliveries() {
+        return false;
+    }
+
+    @Override
+    public synchronized void clear() {
+        cleanImmutableBuckets(true);
+        sharedBucketPriorityQueue.clear();
+        lastMutableBucket.clear();
+        snapshotSegmentLastIndexTable.clear();
+        numberDelayedMessages = 0;
+    }
+
+    @Override
+    public synchronized void close() {
+        super.close();
+        lastMutableBucket.close();
+        cleanImmutableBuckets(false);
+        sharedBucketPriorityQueue.close();
+    }
+
+    private void cleanImmutableBuckets(boolean delete) {
+        if (immutableBuckets != null) {
+            Iterator<ImmutableBucket> iterator = 
immutableBuckets.asMapOfRanges().values().iterator();
+            while (iterator.hasNext()) {
+                ImmutableBucket bucket = iterator.next();
+                bucket.clear(delete);
+                iterator.remove();
+            }
+        }
+    }
+
+    private boolean removeIndexBit(long ledgerId, long entryId) {
+        if (lastMutableBucket.removeIndexBit(ledgerId, entryId)) {
+            return true;
+        }
+
+        return findImmutableBucket(ledgerId).map(bucket -> 
bucket.removeIndexBit(ledgerId, entryId))
+                .orElse(false);
+    }
+
+    @Override
+    public boolean containsMessage(long ledgerId, long entryId) {
+        if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
+            return true;
+        }
+
+        return findImmutableBucket(ledgerId).map(bucket -> 
bucket.containsMessage(ledgerId, entryId))
+                .orElse(false);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
similarity index 98%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 7e5fa633dd9..3ab4ce1ad27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.delayed;
+package org.apache.pulsar.broker.delayed.bucket;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
new file mode 100644
index 00000000000..833030c5751
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.CollectionUtils;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+
+@Slf4j
+class ImmutableBucket extends Bucket {
+    ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long 
startLedgerId, long endLedgerId) {
+        super(cursor, storage, startLedgerId, endLedgerId);
+    }
+
+    /**
+     * Asynchronous load next bucket snapshot entry.
+     * @param isRecover whether used to recover bucket snapshot
+     * @return CompletableFuture
+     */
+    CompletableFuture<List<DelayedIndex>> 
asyncLoadNextBucketSnapshotEntry(boolean isRecover) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Load next bucket snapshot data, bucket: {}", 
cursor.getName(), this);
+        }
+
+        // Wait bucket snapshot create finish
+        CompletableFuture<Void> snapshotCreateFuture =
+                getSnapshotCreateFuture().orElseGet(() -> 
CompletableFuture.completedFuture(null))
+                        .thenApply(__ -> null);
+
+        return snapshotCreateFuture.thenCompose(__ -> {
+            final long bucketId = getAndUpdateBucketId();
+            CompletableFuture<Integer> loadMetaDataFuture = new 
CompletableFuture<>();
+            if (isRecover) {
+                // TODO Recover bucket snapshot
+            } else {
+                loadMetaDataFuture.complete(currentSegmentEntryId + 1);
+            }
+
+            return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
+                if (nextSegmentEntryId > lastSegmentEntryId) {
+                    // TODO Delete bucket snapshot
+                    return CompletableFuture.completedFuture(null);
+                }
+
+                return 
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, 
nextSegmentEntryId)
+                        .thenApply(bucketSnapshotSegments -> {
+                            if 
(CollectionUtils.isEmpty(bucketSnapshotSegments)) {
+                                return Collections.emptyList();
+                            }
+
+                            
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
+                                    bucketSnapshotSegments.get(0);
+                            
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
+                                    snapshotSegment.getIndexesList();
+                            this.setCurrentSegmentEntryId(nextSegmentEntryId);
+                            return indexList;
+                        });
+            });
+        });
+    }
+
+    void clear(boolean delete) {
+        delayedIndexBitMap.clear();
+        getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
+            if (delete) {
+                snapshotGenerateFuture.cancel(true);
+                // TODO delete bucket snapshot
+            } else {
+                try {
+                    snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.warn("Failed wait to snapshot generate, bucketId: {}, 
bucketKey: {}", getBucketId(),
+                            bucketKey());
+                }
+            }
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
new file mode 100644
index 00000000000..36026298269
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.RoaringBitmap;
+
+@Slf4j
+class MutableBucket extends Bucket implements AutoCloseable {
+
+    private final TripleLongPriorityQueue priorityQueue;
+
+    MutableBucket(ManagedCursor cursor,
+                  BucketSnapshotStorage bucketSnapshotStorage) {
+        super(cursor, bucketSnapshotStorage, -1L, -1L);
+        this.priorityQueue = new TripleLongPriorityQueue();
+    }
+
+    Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
+            long timeStepPerBucketSnapshotSegment,
+            TripleLongPriorityQueue sharedQueue) {
+        if (priorityQueue.isEmpty()) {
+            return null;
+        }
+        long numMessages = 0;
+
+        final long startLedgerId = getStartLedgerId();
+        final long endLedgerId = getEndLedgerId();
+
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
+        Map<Long, RoaringBitmap> bitMap = new HashMap<>();
+        SnapshotSegment.Builder snapshotSegmentBuilder = 
SnapshotSegment.newBuilder();
+        SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
+
+        long currentTimestampUpperLimit = 0;
+        while (!priorityQueue.isEmpty()) {
+            long timestamp = priorityQueue.peekN1();
+            if (currentTimestampUpperLimit == 0) {
+                currentTimestampUpperLimit = timestamp + 
timeStepPerBucketSnapshotSegment - 1;
+            }
+
+            long ledgerId = priorityQueue.peekN2();
+            long entryId = priorityQueue.peekN3();
+
+            checkArgument(ledgerId >= startLedgerId && ledgerId <= 
endLedgerId);
+
+            // Move first segment of bucket snapshot to 
sharedBucketPriorityQueue
+            if (segmentMetadataList.size() == 0) {
+                sharedQueue.add(timestamp, ledgerId, entryId);
+            }
+
+            priorityQueue.pop();
+            numMessages++;
+
+            DelayedIndex delayedIndex = DelayedIndex.newBuilder()
+                    .setTimestamp(timestamp)
+                    .setLedgerId(ledgerId)
+                    .setEntryId(entryId).build();
+
+            bitMap.computeIfAbsent(ledgerId, k -> new 
RoaringBitmap()).add(entryId, entryId + 1);
+
+            snapshotSegmentBuilder.addIndexes(delayedIndex);
+
+            if (priorityQueue.isEmpty() || priorityQueue.peekN1() > 
currentTimestampUpperLimit) {
+                segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
+                currentTimestampUpperLimit = 0;
+
+                Iterator<Map.Entry<Long, RoaringBitmap>> iterator = 
bitMap.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<Long, RoaringBitmap> entry = iterator.next();
+                    byte[] array = new 
byte[entry.getValue().serializedSizeInBytes()];
+                    entry.getValue().serialize(ByteBuffer.wrap(array));
+                    
segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), 
ByteString.copyFrom(array));
+                    iterator.remove();
+                }
+
+                segmentMetadataList.add(segmentMetadataBuilder.build());
+                segmentMetadataBuilder.clear();
+
+                bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
+                snapshotSegmentBuilder.clear();
+            }
+        }
+
+        SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
+                .addAllMetadataList(segmentMetadataList)
+                .build();
+
+        final int lastSegmentEntryId = segmentMetadataList.size();
+
+        ImmutableBucket bucket = new ImmutableBucket(cursor, 
bucketSnapshotStorage, startLedgerId, endLedgerId);
+        bucket.setCurrentSegmentEntryId(1);
+        bucket.setNumberBucketDelayedMessages(numMessages);
+        bucket.setLastSegmentEntryId(lastSegmentEntryId);
+
+        // Add the first snapshot segment last message to 
snapshotSegmentLastMessageTable
+        checkArgument(!bucketSnapshotSegments.isEmpty());
+        SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
+        DelayedIndex lastDelayedIndex = 
snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
+        Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, 
lastDelayedIndex);
+
+        CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
+                bucketSnapshotMetadata, bucketSnapshotSegments);
+        bucket.setSnapshotCreateFuture(future);
+        future.whenComplete((__, ex) -> {
+            if (ex == null) {
+                bucket.setSnapshotCreateFuture(null);
+            } else {
+                //TODO Record create snapshot failed
+                log.error("Failed to create snapshot: ", ex);
+            }
+        });
+
+        return result;
+    }
+
+    void moveScheduledMessageToSharedQueue(long cutoffTime, 
TripleLongPriorityQueue sharedBucketPriorityQueue) {
+        while (!priorityQueue.isEmpty()) {
+            long timestamp = priorityQueue.peekN1();
+            if (timestamp > cutoffTime) {
+                break;
+            }
+
+            long ledgerId = priorityQueue.peekN2();
+            long entryId = priorityQueue.peekN3();
+            sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId);
+
+            priorityQueue.pop();
+        }
+    }
+
+    void resetLastMutableBucketRange() {
+        this.startLedgerId = -1L;
+        this.endLedgerId = -1L;
+    }
+
+    void clear() {
+        this.resetLastMutableBucketRange();
+        this.delayedIndexBitMap.clear();
+    }
+
+    public void close() {
+        priorityQueue.close();
+    }
+
+    long getBufferMemoryUsage() {
+        return priorityQueue.bytesCapacity();
+    }
+
+    boolean isEmpty() {
+        return priorityQueue.isEmpty();
+    }
+
+    long nextDeliveryTime() {
+        return priorityQueue.peekN1();
+    }
+
+    long size() {
+        return priorityQueue.size();
+    }
+
+    void addMessage(long ledgerId, long entryId, long deliverAt) {
+        priorityQueue.add(deliverAt, ledgerId, entryId);
+        if (startLedgerId == -1L) {
+            this.startLedgerId = ledgerId;
+        }
+        this.endLedgerId = ledgerId;
+        putIndexBit(ledgerId, entryId);
+    }
+}
diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
similarity index 59%
copy from 
pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
index eda9a8f92e6..0ab45ecded2 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,27 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-syntax = "proto2";
-
-package pulsar.delay;
-option java_package = "org.apache.pulsar.broker.delayed.proto";
-option optimize_for = SPEED;
-
-message DelayedIndex {
-    required uint64 timestamp = 1;
-    required int64 ledger_id = 2;
-    required int64 entry_id = 3;
-}
-
-message SnapshotSegmentMetadata {
-    map<uint64, bytes> delayed_index_bit_map = 1;
-    required uint64 max_schedule_timestamp = 2;
-}
-
-message SnapshotSegment {
-    repeated DelayedIndex indexes = 1;
-}
-
-message SnapshotMetadata {
-    repeated SnapshotSegmentMetadata metadata_list = 1;
-}
+package org.apache.pulsar.broker.delayed.bucket;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b777087ba2f..193ee07561d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1120,6 +1120,10 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return 0;
     }
 
+    public ManagedCursor getCursor() {
+        return cursor;
+    }
+
     protected int getStickyKeyHash(Entry entry) {
         return 
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
     }
diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto 
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index eda9a8f92e6..8414a583fe5 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -24,8 +24,8 @@ option optimize_for = SPEED;
 
 message DelayedIndex {
     required uint64 timestamp = 1;
-    required int64 ledger_id = 2;
-    required int64 entry_id = 3;
+    required uint64 ledger_id = 2;
+    required uint64 entry_id = 3;
 }
 
 message SnapshotSegmentMetadata {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
new file mode 100644
index 00000000000..1d166a8db5c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.Collections;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public abstract class AbstractDeliveryTrackerTest {
+
+    // Create a single shared timer for the test.
+    protected final Timer timer =
+            new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+                    500, TimeUnit.MILLISECONDS);
+    protected PersistentDispatcherMultipleConsumers dispatcher;
+    protected Clock clock;
+
+    protected AtomicLong clockTime;
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        timer.stop();
+    }
+
+    @Test(dataProvider = "delayedTracker")
+    public void test(DelayedDeliveryTracker tracker) throws Exception {
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 2, 20));
+        assertTrue(tracker.addMessage(2, 1, 10));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 5, 50));
+        assertTrue(tracker.addMessage(5, 4, 40));
+
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+
+        assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
+
+        // Move time forward
+        clockTime.set(15);
+
+        // Message is rejected by tracker since it's already ready to send
+        assertFalse(tracker.addMessage(6, 6, 10));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+        assertTrue(tracker.hasMessageAvailable());
+        Set<PositionImpl> scheduled = tracker.getScheduledMessages(10);
+        assertEquals(scheduled.size(), 1);
+
+        // Move time forward
+        clockTime.set(60);
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 4);
+        assertTrue(tracker.hasMessageAvailable());
+        scheduled = tracker.getScheduledMessages(1);
+        assertEquals(scheduled.size(), 1);
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 3);
+        assertTrue(tracker.hasMessageAvailable());
+        scheduled = tracker.getScheduledMessages(3);
+        assertEquals(scheduled.size(), 3);
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
+
+        tracker.close();
+    }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testWithTimer(DelayedDeliveryTracker tracker, 
NavigableMap<Long, TimerTask> tasks) throws Exception {
+        assertTrue(tasks.isEmpty());
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertEquals(tasks.size(), 1);
+        assertEquals(tasks.firstKey().longValue(), 20);
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertEquals(tasks.size(), 1);
+        assertEquals(tasks.firstKey().longValue(), 10);
+
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertEquals(tasks.size(), 1);
+        assertEquals(tasks.firstKey().longValue(), 10);
+
+        clockTime.set(15);
+
+        TimerTask task = tasks.pollFirstEntry().getValue();
+        Timeout cancelledTimeout = mock(Timeout.class);
+        when(cancelledTimeout.isCancelled()).thenReturn(true);
+        task.run(cancelledTimeout);
+        verify(dispatcher, atMostOnce()).readMoreEntries();
+
+        task.run(mock(Timeout.class));
+        verify(dispatcher).readMoreEntries();
+
+        tracker.close();
+    }
+
+    /**
+     * Adding a message that is about to expire within the tick time should 
lead
+     * to a rejection from the tracker when 
isDelayedDeliveryDeliverAtTimeStrict is false.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void testAddWithinTickTime(DelayedDeliveryTracker tracker) {
+        clockTime.set(0);
+
+        assertFalse(tracker.addMessage(1, 1, 10));
+        assertFalse(tracker.addMessage(2, 2, 99));
+        assertFalse(tracker.addMessage(3, 3, 100));
+        assertTrue(tracker.addMessage(4, 4, 101));
+        assertTrue(tracker.addMessage(5, 5, 200));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 2);
+
+        tracker.close();
+    }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testAddMessageWithStrictDelay(DelayedDeliveryTracker tracker) {
+        clockTime.set(10);
+
+        // Verify behavior for the less than, equal to, and greater than 
deliverAt times.
+        assertFalse(tracker.addMessage(1, 1, 9));
+        assertFalse(tracker.addMessage(4, 4, 10));
+        assertTrue(tracker.addMessage(1, 1, 11));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+        assertFalse(tracker.hasMessageAvailable());
+
+        tracker.close();
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but the deliverAt time 
is too early to run another tick, so the
+     * tickTimeMillis determines the delay.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker
 tracker)
+            throws Exception {
+        // Set clock time, then run tracker to inherit clock time as the last 
tick time.
+        clockTime.set(10000);
+        Timeout timeout = mock(Timeout.class);
+        when(timeout.isCancelled()).then(x -> false);
+        ((AbstractDelayedDeliveryTracker) tracker).run(timeout);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Add a message that has a delivery time just after the previous run. 
It will get delivered based on the
+        // tick delay plus the last tick run.
+        assertTrue(tracker.addMessage(1, 1, 10001));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was 
doing the triggering.
+        Thread.sleep(600);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+        tracker.close();
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but before the 
(tickTimeMillis + now). Because there wasn't a
+     * recent tick run, the deliverAt time determines the delay.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict(
+            DelayedDeliveryTracker tracker) {
+        clockTime.set(500000);
+
+        assertTrue(tracker.addMessage(1, 1, 500005));
+
+        // Wait long enough for the runnable to run, but not longer than the 
tick time. The point is that the delivery
+        // should get scheduled early when the tick duration has passed since 
the last tick.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+        tracker.close();
+    }
+
+    /**
+     * In this test, the deliverAt time is after now plus tickTimeMillis, so 
the tickTimeMillis determines the delay.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker
 tracker)
+            throws Exception {
+        clockTime.set(0);
+
+        assertTrue(tracker.addMessage(1, 1, 2000));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was 
doing the triggering.
+        Thread.sleep(1000);
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+        tracker.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
new file mode 100644
index 00000000000..331ceb83a99
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
+
+    private BucketSnapshotStorage bucketSnapshotStorage;
+
+    @AfterMethod
+    public void clean() throws Exception {
+        if (bucketSnapshotStorage != null) {
+            bucketSnapshotStorage.close();
+        }
+    }
+
+    @DataProvider(name = "delayedTracker")
+    public Object[][] provider(Method method) throws Exception {
+        dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+        clock = mock(Clock.class);
+        clockTime = new AtomicLong();
+        when(clock.millis()).then(x -> clockTime.get());
+
+        bucketSnapshotStorage = new MockBucketSnapshotStorage();
+        bucketSnapshotStorage.start();
+        ManagedCursor cursor = new MockManagedCursor("my_test_cursor");
+        doReturn(cursor).when(dispatcher).getCursor();
+
+        final String methodName = method.getName();
+        return switch (methodName) {
+            case "test" -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+            }};
+            case "testWithTimer" -> {
+                Timer timer = mock(Timer.class);
+
+                AtomicLong clockTime = new AtomicLong();
+                Clock clock = mock(Clock.class);
+                when(clock.millis()).then(x -> clockTime.get());
+
+                NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
+
+                when(timer.newTimeout(any(), anyLong(), 
any())).then(invocation -> {
+                    TimerTask task = invocation.getArgument(0, 
TimerTask.class);
+                    long timeout = invocation.getArgument(1, Long.class);
+                    TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+                    long scheduleAt = clockTime.get() + unit.toMillis(timeout);
+                    tasks.put(scheduleAt, task);
+
+                    Timeout t = mock(Timeout.class);
+                    when(t.cancel()).then(i -> {
+                        tasks.remove(scheduleAt, task);
+                        return null;
+                    });
+                    return t;
+                });
+
+                yield new Object[][]{{
+                        new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                                false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50),
+                        tasks
+                }};
+            }
+            case "testAddWithinTickTime" -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
+                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+            }};
+            case "testAddMessageWithStrictDelay" -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+            }};
+            case 
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> 
new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+            }};
+            case 
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", 
"testRecoverSnapshot" ->
+                    new Object[][]{{
+                            new BucketDelayedDeliveryTracker(dispatcher, 
timer, 100000, clock,
+                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                    }};
+            case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", 
"testExistDelayedMessage" ->
+                    new Object[][]{{
+                            new BucketDelayedDeliveryTracker(dispatcher, 
timer, 500, clock,
+                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                    }};
+            default -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                            true, bucketSnapshotStorage, 1000, 
TimeUnit.MILLISECONDS.toMillis(100), 50)
+            }};
+        };
+    }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testContainsMessage(DelayedDeliveryTracker tracker) {
+        tracker.addMessage(1, 1, 10);
+        tracker.addMessage(2, 2, 20);
+
+        assertTrue(tracker.containsMessage(1, 1));
+        clockTime.set(20);
+
+        Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(1);
+        
assertEquals(scheduledMessages.stream().findFirst().get().getEntryId(), 1);
+
+        tracker.addMessage(3, 3, 30);
+
+        tracker.addMessage(4, 4, 30);
+
+        tracker.addMessage(5, 5, 30);
+
+        tracker.addMessage(6, 6, 30);
+
+        assertTrue(tracker.containsMessage(3, 3));
+
+        tracker.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 1a98233c385..6711aed924c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -18,13 +18,9 @@
  */
 package org.apache.pulsar.broker.delayed;
 
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -35,297 +31,95 @@ import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Method;
 import java.time.Clock;
-import java.util.Collections;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
-import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class InMemoryDeliveryTrackerTest {
+public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest {
 
-    // Create a single shared timer for the test.
-    private final Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
-            500, TimeUnit.MILLISECONDS);
-
-    @AfterClass(alwaysRun = true)
-    public void cleanup() {
-        timer.stop();
-    }
-
-    @Test
-    public void test() throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
+    @DataProvider(name = "delayedTracker")
+    public Object[][] provider(Method method) throws Exception {
+        dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+        clock = mock(Clock.class);
+        clockTime = new AtomicLong();
         when(clock.millis()).then(x -> clockTime.get());
 
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false, 0);
-
-        assertFalse(tracker.hasMessageAvailable());
-
-        assertTrue(tracker.addMessage(2, 2, 20));
-        assertTrue(tracker.addMessage(1, 1, 10));
-        assertTrue(tracker.addMessage(3, 3, 30));
-        assertTrue(tracker.addMessage(5, 5, 50));
-        assertTrue(tracker.addMessage(4, 4, 40));
-
-        assertFalse(tracker.hasMessageAvailable());
-        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
-
-        assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
-
-        // Move time forward
-        clockTime.set(15);
-
-        // Message is rejected by tracker since it's already ready to send
-        assertFalse(tracker.addMessage(6, 6, 10));
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
-        assertTrue(tracker.hasMessageAvailable());
-        Set<PositionImpl> scheduled = tracker.getScheduledMessages(10);
-        assertEquals(scheduled.size(), 1);
-
-        // Move time forward
-        clockTime.set(60);
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 4);
-        assertTrue(tracker.hasMessageAvailable());
-        scheduled = tracker.getScheduledMessages(1);
-        assertEquals(scheduled.size(), 1);
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 3);
-        assertTrue(tracker.hasMessageAvailable());
-        scheduled = tracker.getScheduledMessages(3);
-        assertEquals(scheduled.size(), 3);
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 0);
-        assertFalse(tracker.hasMessageAvailable());
-        assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
-    }
-
-    @Test
-    public void testWithTimer() throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-        Timer timer = mock(Timer.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
-
-        when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
-            TimerTask task = invocation.getArgument(0, TimerTask.class);
-            long timeout = invocation.getArgument(1, Long.class);
-            TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
-            long scheduleAt = clockTime.get() + unit.toMillis(timeout);
-            tasks.put(scheduleAt, task);
-
-            Timeout t = mock(Timeout.class);
-            when(t.cancel()).then(i -> {
-                tasks.remove(scheduleAt, task);
-                return null;
-            });
-            return t;
-        });
-
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false, 0);
-
-        assertTrue(tasks.isEmpty());
-        assertTrue(tracker.addMessage(2, 2, 20));
-        assertEquals(tasks.size(), 1);
-        assertEquals(tasks.firstKey().longValue(), 20);
-
-        assertTrue(tracker.addMessage(1, 1, 10));
-        assertEquals(tasks.size(), 1);
-        assertEquals(tasks.firstKey().longValue(), 10);
-
-        assertTrue(tracker.addMessage(3, 3, 30));
-        assertEquals(tasks.size(), 1);
-        assertEquals(tasks.firstKey().longValue(), 10);
-
-        clockTime.set(15);
-
-        TimerTask task = tasks.pollFirstEntry().getValue();
-        Timeout cancelledTimeout = mock(Timeout.class);
-        when(cancelledTimeout.isCancelled()).thenReturn(true);
-        task.run(cancelledTimeout);
-        verifyZeroInteractions(dispatcher);
-
-        task.run(mock(Timeout.class));
-        verify(dispatcher).readMoreEntries();
-    }
-
-    /**
-     * Adding a message that is about to expire within the tick time should 
lead
-     * to a rejection from the tracker when 
isDelayedDeliveryDeliverAtTimeStrict is false.
-     */
-    @Test
-    public void testAddWithinTickTime() {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                false, 0);
-
-        clockTime.set(0);
-
-        assertFalse(tracker.addMessage(1, 1, 10));
-        assertFalse(tracker.addMessage(2, 2, 99));
-        assertFalse(tracker.addMessage(3, 3, 100));
-        assertTrue(tracker.addMessage(4, 4, 101));
-        assertTrue(tracker.addMessage(5, 5, 200));
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 2);
-    }
-
-    public void testAddMessageWithStrictDelay() {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                true, 0);
-
-        clockTime.set(10);
-
-        // Verify behavior for the less than, equal to, and greater than 
deliverAt times.
-        assertFalse(tracker.addMessage(1, 1, 9));
-        assertFalse(tracker.addMessage(4, 4, 10));
-        assertTrue(tracker.addMessage(1, 1, 11));
-
-        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
-        assertFalse(tracker.hasMessageAvailable());
-    }
-
-    /**
-     * In this test, the deliverAt time is after now, but the deliverAt time 
is too early to run another tick, so the
-     * tickTimeMillis determines the delay.
-     */
-    public void 
testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() 
throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                1000, clock, true, 0);
-
-        // Set clock time, then run tracker to inherit clock time as the last 
tick time.
-        clockTime.set(10000);
-        Timeout timeout = mock(Timeout.class);
-        when(timeout.isCancelled()).then(x -> false);
-        tracker.run(timeout);
-        verify(dispatcher, times(1)).readMoreEntries();
-
-        // Add a message that has a delivery time just after the previous run. 
It will get delivered based on the
-        // tick delay plus the last tick run.
-        assertTrue(tracker.addMessage(1, 1, 10001));
-
-        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
-        // passed where it would have been triggered if the tick time was 
doing the triggering.
-        Thread.sleep(600);
-        verify(dispatcher, times(1)).readMoreEntries();
-
-        // Not wait for the message delivery to get triggered.
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
-    }
-
-    /**
-     * In this test, the deliverAt time is after now, but before the 
(tickTimeMillis + now). Because there wasn't a
-     * recent tick run, the deliverAt time determines the delay.
-     */
-    public void 
testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        // Use a large tick time to show that the message will get delivered 
earlier because there wasn't
-        // a previous tick run.
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                100000, clock, true, 0);
-
-        clockTime.set(500000);
-
-        assertTrue(tracker.addMessage(1, 1, 500005));
-
-        // Wait long enough for the runnable to run, but not longer than the 
tick time. The point is that the delivery
-        // should get scheduled early when the tick duration has passed since 
the last tick.
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
-    }
-
-    /**
-     * In this test, the deliverAt time is after now plus tickTimeMillis, so 
the tickTimeMillis determines the delay.
-     */
-    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() 
throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                500, clock, true, 0);
-
-        clockTime.set(0);
-
-        assertTrue(tracker.addMessage(1, 1, 2000));
-
-        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
-        // passed where it would have been triggered if the tick time was 
doing the triggering.
-        Thread.sleep(1000);
-        verifyNoInteractions(dispatcher);
-
-        // Not wait for the message delivery to get triggered.
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+        final String methodName = method.getName();
+        return switch (methodName) {
+            case "test" -> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                            false, 0)
+            }};
+            case "testWithTimer" -> {
+                Timer timer = mock(Timer.class);
+
+                AtomicLong clockTime = new AtomicLong();
+                Clock clock = mock(Clock.class);
+                when(clock.millis()).then(x -> clockTime.get());
+
+                NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
+
+                when(timer.newTimeout(any(), anyLong(), 
any())).then(invocation -> {
+                    TimerTask task = invocation.getArgument(0, 
TimerTask.class);
+                    long timeout = invocation.getArgument(1, Long.class);
+                    TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+                    long scheduleAt = clockTime.get() + unit.toMillis(timeout);
+                    tasks.put(scheduleAt, task);
+
+                    Timeout t = mock(Timeout.class);
+                    when(t.cancel()).then(i -> {
+                        tasks.remove(scheduleAt, task);
+                        return null;
+                    });
+                    return t;
+                });
+
+                yield new Object[][]{{
+                        new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
1, clock,
+                                false, 0),
+                        tasks
+                }};
+            }
+            case "testAddWithinTickTime" -> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
+                            false, 0)
+            }};
+            case "testAddMessageWithStrictDelay" -> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
+                            true, 0)
+            }};
+            case 
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> 
new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
1000, clock,
+                            true, 0)
+            }};
+            case 
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> 
new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
+                            true, 0)
+            }};
+            case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" 
-> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, 
clock,
+                            true, 0)
+            }};
+            case "testWithFixedDelays", 
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, 
clock,
+                            true, 100)
+            }};
+            default -> new Object[][]{{
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                            true, 0)
+            }};
+        };
     }
 
-    @Test
-    public void testWithFixedDelays() throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        final long fixedDelayLookahead = 100;
-
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true, fixedDelayLookahead);
-
+    @Test(dataProvider = "delayedTracker")
+    public void testWithFixedDelays(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
         assertFalse(tracker.hasMessageAvailable());
 
         assertTrue(tracker.addMessage(1, 1, 10));
@@ -338,15 +132,16 @@ public class InMemoryDeliveryTrackerTest {
         assertEquals(tracker.getNumberOfDelayedMessages(), 5);
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= fixedDelayLookahead; i++) {
+        for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
         assertTrue(tracker.shouldPauseAllDeliveries());
 
-        clockTime.set(fixedDelayLookahead * 10);
+        clockTime.set(tracker.getFixedDelayDetectionLookahead() * 10);
 
         tracker.getScheduledMessages(100);
+
         assertFalse(tracker.shouldPauseAllDeliveries());
 
         // Empty the tracker
@@ -356,22 +151,12 @@ public class InMemoryDeliveryTrackerTest {
         } while (removed > 0);
 
         assertFalse(tracker.shouldPauseAllDeliveries());
-    }
-
-    @Test
-    public void testWithMixedDelays() throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        long fixedDelayLookahead = 100;
 
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true, fixedDelayLookahead);
+        tracker.close();
+    }
 
+    @Test(dataProvider = "delayedTracker")
+    public void testWithMixedDelays(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
         assertFalse(tracker.hasMessageAvailable());
 
         assertTrue(tracker.addMessage(1, 1, 10));
@@ -382,32 +167,22 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= fixedDelayLookahead; i++) {
+        for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
         assertTrue(tracker.shouldPauseAllDeliveries());
 
         // Add message with earlier delivery time
-        assertTrue(tracker.addMessage(5, 5, 5));
+        assertTrue(tracker.addMessage(5, 6, 5));
 
         assertFalse(tracker.shouldPauseAllDeliveries());
-    }
-
-    @Test
-    public void testWithNoDelays() throws Exception {
-        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
-
-        AtomicLong clockTime = new AtomicLong();
-        Clock clock = mock(Clock.class);
-        when(clock.millis()).then(x -> clockTime.get());
-
-        long fixedDelayLookahead = 100;
 
-        @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true, fixedDelayLookahead);
+        tracker.close();
+    }
 
+    @Test(dataProvider = "delayedTracker")
+    public void testWithNoDelays(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
         assertFalse(tracker.hasMessageAvailable());
 
         assertTrue(tracker.addMessage(1, 1, 10));
@@ -418,16 +193,18 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= fixedDelayLookahead; i++) {
+        for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
         assertTrue(tracker.shouldPauseAllDeliveries());
 
         // Add message with no-delay
-        assertFalse(tracker.addMessage(5, 5, -1L));
+        assertFalse(tracker.addMessage(5, 6, -1L));
 
         assertFalse(tracker.shouldPauseAllDeliveries());
+
+        tracker.close();
     }
 
     @Test
@@ -471,5 +248,4 @@ public class InMemoryDeliveryTrackerTest {
 
         timer.stop();
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
new file mode 100644
index 00000000000..89831a1d5e7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@Slf4j
+public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private final AtomicLong maxBucketId;
+
+    private final Map<Long, List<ByteBuf>> bucketSnapshots;
+
+    private final ExecutorService executorService =
+            new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>(),
+                    new DefaultThreadFactory("bucket-snapshot-storage-io"));
+
+    public MockBucketSnapshotStorage() {
+        this.bucketSnapshots = new ConcurrentHashMap<>();
+        this.maxBucketId = new AtomicLong();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(
+            SnapshotMetadata snapshotMetadata, List<SnapshotSegment> 
bucketSnapshotSegments) {
+        return CompletableFuture.supplyAsync(() -> {
+            long bucketId = maxBucketId.getAndIncrement();
+            List<ByteBuf> entries = new ArrayList<>();
+            byte[] bytes = snapshotMetadata.toByteArray();
+            ByteBuf byteBuf = 
PooledByteBufAllocator.DEFAULT.directBuffer(bytes.length);
+            byteBuf.writeBytes(bytes);
+            entries.add(byteBuf);
+            this.bucketSnapshots.put(bucketId, entries);
+            return bucketId;
+        }, executorService).thenApply(bucketId -> {
+            List<ByteBuf> bufList = new ArrayList<>();
+            for (SnapshotSegment snapshotSegment : bucketSnapshotSegments) {
+                byte[] bytes = snapshotSegment.toByteArray();
+                ByteBuf byteBuf = 
PooledByteBufAllocator.DEFAULT.directBuffer(bytes.length);
+                byteBuf.writeBytes(bytes);
+                bufList.add(byteBuf);
+            }
+            bucketSnapshots.get(bucketId).addAll(bufList);
+
+            return bucketId;
+        });
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
+        return CompletableFuture.supplyAsync(() -> {
+            ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0);
+            SnapshotMetadata snapshotMetadata;
+            try {
+                snapshotMetadata = 
SnapshotMetadata.parseFrom(byteBuf.nioBuffer());
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+            return snapshotMetadata;
+        }, executorService);
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             
long lastSegmentEntryId) {
+        return CompletableFuture.supplyAsync(() -> {
+            List<SnapshotSegment> snapshotSegments = new ArrayList<>();
+            long lastEntryId = Math.min(lastSegmentEntryId, 
this.bucketSnapshots.get(bucketId).size());
+            for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
+                ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
+                SnapshotSegment snapshotSegment;
+                try {
+                    snapshotSegment = 
SnapshotSegment.parseFrom(byteBuf.nioBuffer());
+                    snapshotSegments.add(snapshotSegment);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return snapshotSegments;
+        }, executorService);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return CompletableFuture.supplyAsync(() -> {
+            List<ByteBuf> remove = this.bucketSnapshots.remove(bucketId);
+            if (remove != null) {
+                for (ByteBuf byteBuf : remove) {
+                    byteBuf.release();
+                }
+            }
+            return null;
+        }, executorService);
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return CompletableFuture.supplyAsync(() -> {
+            long length = 0;
+            List<ByteBuf> bufList = this.bucketSnapshots.get(bucketId);
+            for (ByteBuf byteBuf : bufList) {
+                length += byteBuf.array().length;
+            }
+            return length;
+        }, executorService);
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        clean();
+    }
+
+    public void clean() {
+        for (List<ByteBuf> value : bucketSnapshots.values()) {
+            for (ByteBuf byteBuf : value) {
+                byteBuf.release();
+            }
+        }
+        bucketSnapshots.clear();
+        executorService.shutdownNow();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
new file mode 100644
index 00000000000..efb0fa7ab7b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.common.collect.Range;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+public class MockManagedCursor implements ManagedCursor {
+
+    private final String name;
+
+    private final Map<String, String> cursorProperties;
+
+    public MockManagedCursor(String name) {
+        this.name = name;
+        this.cursorProperties = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public String getName() {
+        return null;
+    }
+
+    @Override
+    public long getLastActive() {
+        return 0;
+    }
+
+    @Override
+    public void updateLastActive() {
+
+    }
+
+    @Override
+    public Map<String, Long> getProperties() {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> getCursorProperties() {
+        return this.cursorProperties;
+    }
+
+    @Override
+    public CompletableFuture<Void> putCursorProperty(String key, String value) 
{
+        cursorProperties.put(key, value);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> 
cursorProperties) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    public CompletableFuture<Void> removeCursorProperty(String key) {
+        cursorProperties.remove(key);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean putProperty(String key, Long value) {
+        return false;
+    }
+
+    @Override
+    public boolean removeProperty(String key) {
+        return false;
+    }
+
+    @Override
+    public List<Entry> readEntries(int numberOfEntriesToRead) throws 
InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncReadEntries(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+                                 PositionImpl maxPosition) {
+
+    }
+
+    @Override
+    public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes,
+                                 AsyncCallbacks.ReadEntriesCallback callback, 
Object ctx, PositionImpl maxPosition) {
+
+    }
+
+    @Override
+    public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncGetNthEntry(int n, IndividualDeletedEntries 
deletedEntries,
+                                 AsyncCallbacks.ReadEntryCallback callback, 
Object ctx) {
+
+    }
+
+    @Override
+    public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncReadEntriesOrWait(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback,
+                                       Object ctx, PositionImpl maxPosition) {
+
+    }
+
+    @Override
+    public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, 
AsyncCallbacks.ReadEntriesCallback callback,
+                                       Object ctx, PositionImpl maxPosition) {
+
+    }
+
+    @Override
+    public boolean cancelPendingReadRequest() {
+        return false;
+    }
+
+    @Override
+    public boolean hasMoreEntries() {
+        return false;
+    }
+
+    @Override
+    public long getNumberOfEntries() {
+        return 0;
+    }
+
+    @Override
+    public long getNumberOfEntriesInBacklog(boolean isPrecise) {
+        return 0;
+    }
+
+    @Override
+    public void markDelete(Position position) throws InterruptedException, 
ManagedLedgerException {
+
+    }
+
+    @Override
+    public void markDelete(Position position, Map<String, Long> properties)
+            throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncMarkDelete(Position position, 
AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void asyncMarkDelete(Position position, Map<String, Long> 
properties,
+                                AsyncCallbacks.MarkDeleteCallback callback, 
Object ctx) {
+
+    }
+
+    @Override
+    public void delete(Position position) throws InterruptedException, 
ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncDelete(Position position, AsyncCallbacks.DeleteCallback 
callback, Object ctx) {
+
+    }
+
+    @Override
+    public void delete(Iterable<Position> positions) throws 
InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncDelete(Iterable<Position> position, 
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Position getReadPosition() {
+        return null;
+    }
+
+    @Override
+    public Position getMarkDeletedPosition() {
+        return null;
+    }
+
+    @Override
+    public Position getPersistentMarkDeletedPosition() {
+        return null;
+    }
+
+    @Override
+    public void rewind() {
+
+    }
+
+    @Override
+    public void seek(Position newReadPosition, boolean force) {
+
+    }
+
+    @Override
+    public void clearBacklog() throws InterruptedException, 
ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncClearBacklog(AsyncCallbacks.ClearBacklogCallback 
callback, Object ctx) {
+
+    }
+
+    @Override
+    public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries 
deletedEntries)
+            throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncSkipEntries(int numEntriesToSkip, 
IndividualDeletedEntries deletedEntries,
+                                 AsyncCallbacks.SkipEntriesCallback callback, 
Object ctx) {
+
+    }
+
+    @Override
+    public Position findNewestMatching(java.util.function.Predicate<Entry> 
condition)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public Position findNewestMatching(FindPositionConstraint constraint, 
java.util.function.Predicate<Entry> condition)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint,
+                                        java.util.function.Predicate<Entry> 
condition,
+                                        AsyncCallbacks.FindEntryCallback 
callback, Object ctx) {
+
+    }
+
+    @Override
+    public void resetCursor(Position position) throws InterruptedException, 
ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncResetCursor(Position position, boolean forceReset, 
AsyncCallbacks.ResetCursorCallback callback) {
+
+    }
+
+    @Override
+    public List<Entry> replayEntries(Set<? extends Position> positions)
+            throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public Set<? extends Position> asyncReplayEntries(Set<? extends Position> 
positions,
+                                                      
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+        return null;
+    }
+
+    @Override
+    public Set<? extends Position> asyncReplayEntries(Set<? extends Position> 
positions,
+                                                      
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+                                                      boolean sortEntries) {
+        return null;
+    }
+
+    @Override
+    public void close() throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Position getFirstPosition() {
+        return null;
+    }
+
+    @Override
+    public void setActive() {
+
+    }
+
+    @Override
+    public void setInactive() {
+
+    }
+
+    @Override
+    public void setAlwaysInactive() {
+
+    }
+
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public boolean isDurable() {
+        return false;
+    }
+
+    @Override
+    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
+        return 0;
+    }
+
+    @Override
+    public int getTotalNonContiguousDeletedMessagesRange() {
+        return 0;
+    }
+
+    @Override
+    public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+        return 0;
+    }
+
+    @Override
+    public long getEstimatedSizeSinceMarkDeletePosition() {
+        return 0;
+    }
+
+    @Override
+    public double getThrottleMarkDelete() {
+        return 0;
+    }
+
+    @Override
+    public void setThrottleMarkDelete(double throttleMarkDelete) {
+
+    }
+
+    @Override
+    public ManagedLedger getManagedLedger() {
+        return null;
+    }
+
+    @Override
+    public Range<PositionImpl> getLastIndividualDeletedRange() {
+        return null;
+    }
+
+    @Override
+    public void trimDeletedEntries(List<Entry> entries) {
+
+    }
+
+    @Override
+    public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
+        return new long[0];
+    }
+
+    @Override
+    public ManagedCursorMXBean getStats() {
+        return null;
+    }
+
+    @Override
+    public boolean checkAndUpdateReadPositionChanged() {
+        return false;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+}

Reply via email to