This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new da3cab5289c [improve][broker][PIP-195] Cut off snapshot segment
according to maxIndexesPerBucketSnapshotSegment (#19706)
da3cab5289c is described below
commit da3cab5289c41cfe1e064b02ce4794224aeaea04
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Mar 14 22:06:21 2023 +0800
[improve][broker][PIP-195] Cut off snapshot segment according to
maxIndexesPerBucketSnapshotSegment (#19706)
---
conf/broker.conf | 7 ++--
conf/standalone.conf | 42 ++++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 12 ++++---
.../BucketDelayedDeliveryTrackerFactory.java | 8 +++--
.../bucket/BucketDelayedDeliveryTracker.java | 19 ++++++----
.../broker/delayed/bucket/MutableBucket.java | 10 ++++--
.../bucket/BucketDelayedDeliveryTrackerTest.java | 41 ++++++++++++++-------
.../persistent/BucketDelayedDeliveryTest.java | 1 +
8 files changed, 109 insertions(+), 31 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 4b7c108be5f..d52adb25456 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -571,13 +571,14 @@ delayedDeliveryMinIndexCountPerBucket=50000
# after reaching the max time step limitation, the snapshot segment will be
cut off.
delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300
+# The max number of delayed message index in per bucket snapshot segment, -1
means no limitation
+# after reaching the max number limitation, the snapshot segment will be cut
off.
+delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
+
# The max number of delayed message index bucket,
# after reaching the max buckets limitation, the adjacent buckets will be
merged.
delayedDeliveryMaxNumBuckets=50
-# Enable share the delayed message index across subscriptions
-delayedDeliverySharedIndexEnabled=false
-
# Size of the lookahead window to use when detecting if all the messages in
the topic
# have a fixed delay.
# Default is 50,000. Setting the lookahead window to 0 will disable the logic
to handle
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ed883406883..f141946c29f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1223,3 +1223,45 @@ configurationStoreServers=
# zookeeper.
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
+
+# Whether to enable the delayed delivery for messages.
+# If disabled, messages will be immediately delivered and there will
+# be no tracking overhead.
+delayedDeliveryEnabled=true
+
+# Class name of the factory that implements the delayed deliver tracker.
+# If value is
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
+# will create bucket based delayed message index tracker.
+delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory
+
+# Control the tick time for when retrying on delayed delivery,
+# affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time
for the
+# InMemoryDelayedDeliveryTrackerFactory (the default
DelayedDeliverTrackerFactory).
+# Default is 1 second.
+delayedDeliveryTickTimeMillis=1000
+
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default
DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may
be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the
broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be
sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the
tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
+# The delayed message index bucket min index count.
+# When the index count of the current bucket is more than this value and all
message indexes of current ledger
+# have already been added to the tracker we will seal the bucket.
+delayedDeliveryMinIndexCountPerBucket=50000
+
+# The delayed message index bucket time step(in seconds) in per bucket
snapshot segment,
+# after reaching the max time step limitation, the snapshot segment will be
cut off.
+delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300
+
+# The max number of delayed message index in per bucket snapshot segment, -1
means no limitation
+# after reaching the max number limitation, the snapshot segment will be cut
off.
+delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
+
+# The max number of delayed message index bucket,
+# after reaching the max buckets limitation, the adjacent buckets will be
merged.
+delayedDeliveryMaxNumBuckets=50
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3c00e905ac7..ff242888ae0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -361,18 +361,20 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private long delayedDeliveryMinIndexCountPerBucket = 50000;
@FieldContext(category = CATEGORY_SERVER, doc = """
- The delayed message index bucket time step(in seconds) in per
bucket snapshot segment, \
+ The delayed message index time step(in seconds) in per bucket
snapshot segment, \
after reaching the max time step limitation, the snapshot segment
will be cut off.""")
- private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
300;
+ private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
300;
+
+ @FieldContext(category = CATEGORY_SERVER, doc = """
+ The max number of delayed message index in per bucket snapshot
segment, -1 means no limitation\
+ after reaching the max number limitation, the snapshot segment
will be cut off.""")
+ private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;
@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index bucket, \
after reaching the max buckets limitation, the adjacent buckets
will be merged.""")
private int delayedDeliveryMaxNumBuckets = 50;
- @FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed
message index across subscriptions")
- private boolean delayedDeliverySharedIndexEnabled = false;
-
@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead
window to use "
+ "when detecting if all the messages in the topic have a fixed
delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will
disable the "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index ae9cb23ceb9..f0feb8b27d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -43,7 +43,9 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
private long delayedDeliveryMinIndexCountPerBucket;
- private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+ private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+
+ private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;
@Override
public void initialize(PulsarService pulsarService) throws Exception {
@@ -58,6 +60,8 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
this.delayedDeliveryMaxNumBuckets =
config.getDelayedDeliveryMaxNumBuckets();
this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
+ this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
+ config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
}
@Override
@@ -65,7 +69,7 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
return new BucketDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
- delayedDeliveryMaxNumBuckets);
+ delayedDeliveryMaxIndexesPerBucketSnapshotSegment,
delayedDeliveryMaxNumBuckets);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index a77b272297b..22689cd737b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -70,6 +70,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final long timeStepPerBucketSnapshotSegmentInMillis;
+ private final int maxIndexesPerBucketSnapshotSegment;
+
private final int maxNumBuckets;
private long numberDelayedMessages;
@@ -89,9 +91,10 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
- int maxNumBuckets) {
+ int maxIndexesPerBucketSnapshotSegment, int
maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
- bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
+ bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis,
+ maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
@@ -99,10 +102,11 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
- int maxNumBuckets) {
+ int maxIndexesPerBucketSnapshotSegment, int
maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis =
timeStepPerBucketSnapshotSegmentInMillis;
+ this.maxIndexesPerBucketSnapshotSegment =
maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
@@ -292,7 +296,9 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
-
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
+ lastMutableBucket.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
@@ -380,8 +386,9 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
- timeStepPerBucketSnapshotSegmentInMillis,
sharedBucketPriorityQueue,
- combinedDelayedIndexQueue,
bucketA.startLedgerId, bucketB.endLedgerId);
+ timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment,
+ sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId,
+ bucketB.endLedgerId);
// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA =
bucketA.getDelayedIndexBitMap();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 40ba8f4c4b5..8187f8f2de1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -50,13 +50,15 @@ class MutableBucket extends Bucket implements AutoCloseable
{
Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
+ int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
- return
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
sharedQueue,
+ return
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
+ maxIndexesPerBucketSnapshotSegment, sharedQueue,
TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue),
startLedgerId, endLedgerId);
}
Pair<ImmutableBucket, DelayedIndex>
createImmutableBucketAndAsyncPersistent(
- final long timeStepPerBucketSnapshotSegment,
+ final long timeStepPerBucketSnapshotSegment, final int
maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue
delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
log.info("[{}] Creating bucket snapshot, startLedgerId: {},
endLedgerId: {}", dispatcherName,
@@ -98,7 +100,9 @@ class MutableBucket extends Bucket implements AutoCloseable {
snapshotSegmentBuilder.addIndexes(delayedIndex);
- if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
+ if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
+ || (maxIndexesPerBucketSnapshotSegment != -1
+ && snapshotSegmentBuilder.getIndexesCount() >=
maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
currentTimestampUpperLimit = 0;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 0ba9e5f4ca2..378ad205bb6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -85,7 +85,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
return switch (methodName) {
case "test" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
- false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testWithTimer" -> {
Timer timer = mock(Timer.class);
@@ -113,39 +113,43 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
yield new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
- false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50),
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50),
tasks
}};
}
case "testAddWithinTickTime" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
- false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict",
"testRecoverSnapshot" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher,
timer, 100000, clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict",
"testExistDelayedMessage" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher,
timer, 500, clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testMergeSnapshot", "testWithBkException",
"testWithCreateFailDowngrade" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 10)
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 10)
+ }};
+ case "testMaxIndexesPerSegment" -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
+ true, bucketSnapshotStorage, 20,
TimeUnit.HOURS.toMillis(1), 5, 100)
}};
default -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
- true, bucketSnapshotStorage, 1000,
TimeUnit.MILLISECONDS.toMillis(100), 50)
+ true, bucketSnapshotStorage, 1000,
TimeUnit.MILLISECONDS.toMillis(100), -1, 50)
}};
};
}
@@ -196,7 +200,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
clockTime.set(30 * 10);
tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50);
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,50);
assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);
@@ -268,7 +272,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
tracker.close();
tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 10);
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
assertEquals(tracker.getNumberOfDelayedMessages(),
delayedMessagesInSnapshot.getValue());
@@ -318,7 +322,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
tracker.close();
tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 10);
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
Long delayedMessagesInSnapshotValue =
delayedMessagesInSnapshot.getValue();
assertEquals(tracker.getNumberOfDelayedMessages(),
delayedMessagesInSnapshotValue);
@@ -374,4 +378,17 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
assertEquals(position, PositionImpl.get(i, i));
}
}
+
+ @Test(dataProvider = "delayedTracker")
+ public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker)
{
+ for (int i = 1; i <= 101; i++) {
+ tracker.addMessage(i, i, i * 10);
+ }
+
+ assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 5);
+
+ tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
+ assertEquals(bucket.getLastSegmentEntryId(), 4);
+ });
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 5d81ba8bc02..fa846779b08 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -32,6 +32,7 @@ public class BucketDelayedDeliveryTest extends
DelayedDeliveryTest {
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
conf.setDelayedDeliveryMaxNumBuckets(10);
conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+ conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
conf.setDelayedDeliveryMinIndexCountPerBucket(50);
conf.setManagedLedgerMaxEntriesPerLedger(50);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);