This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da3cab5289c [improve][broker][PIP-195] Cut off snapshot segment 
according to maxIndexesPerBucketSnapshotSegment (#19706)
da3cab5289c is described below

commit da3cab5289c41cfe1e064b02ce4794224aeaea04
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Mar 14 22:06:21 2023 +0800

    [improve][broker][PIP-195] Cut off snapshot segment according to 
maxIndexesPerBucketSnapshotSegment (#19706)
---
 conf/broker.conf                                   |  7 ++--
 conf/standalone.conf                               | 42 ++++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 12 ++++---
 .../BucketDelayedDeliveryTrackerFactory.java       |  8 +++--
 .../bucket/BucketDelayedDeliveryTracker.java       | 19 ++++++----
 .../broker/delayed/bucket/MutableBucket.java       | 10 ++++--
 .../bucket/BucketDelayedDeliveryTrackerTest.java   | 41 ++++++++++++++-------
 .../persistent/BucketDelayedDeliveryTest.java      |  1 +
 8 files changed, 109 insertions(+), 31 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4b7c108be5f..d52adb25456 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -571,13 +571,14 @@ delayedDeliveryMinIndexCountPerBucket=50000
 # after reaching the max time step limitation, the snapshot segment will be 
cut off.
 delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300
 
+# The max number of delayed message index in per bucket snapshot segment, -1 
means no limitation
+# after reaching the max number limitation, the snapshot segment will be cut 
off.
+delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
+
 # The max number of delayed message index bucket,
 # after reaching the max buckets limitation, the adjacent buckets will be 
merged.
 delayedDeliveryMaxNumBuckets=50
 
-# Enable share the delayed message index across subscriptions
-delayedDeliverySharedIndexEnabled=false
-
 # Size of the lookahead window to use when detecting if all the messages in 
the topic
 # have a fixed delay.
 # Default is 50,000. Setting the lookahead window to 0 will disable the logic 
to handle
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ed883406883..f141946c29f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1223,3 +1223,45 @@ configurationStoreServers=
 # zookeeper.
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
+
+# Whether to enable the delayed delivery for messages.
+# If disabled, messages will be immediately delivered and there will
+# be no tracking overhead.
+delayedDeliveryEnabled=true
+
+# Class name of the factory that implements the delayed deliver tracker.
+# If value is 
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
+# will create bucket based delayed message index tracker.
+delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory
+
+# Control the tick time for when retrying on delayed delivery,
+# affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time 
for the
+# InMemoryDelayedDeliveryTrackerFactory (the default 
DelayedDeliverTrackerFactory).
+# Default is 1 second.
+delayedDeliveryTickTimeMillis=1000
+
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default 
DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may 
be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the 
broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be 
sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the 
tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
+# The delayed message index bucket min index count.
+# When the index count of the current bucket is more than this value and all 
message indexes of current ledger
+# have already been added to the tracker we will seal the bucket.
+delayedDeliveryMinIndexCountPerBucket=50000
+
+# The delayed message index bucket time step(in seconds) in per bucket 
snapshot segment,
+# after reaching the max time step limitation, the snapshot segment will be 
cut off.
+delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300
+
+# The max number of delayed message index in per bucket snapshot segment, -1 
means no limitation
+# after reaching the max number limitation, the snapshot segment will be cut 
off.
+delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
+
+# The max number of delayed message index bucket,
+# after reaching the max buckets limitation, the adjacent buckets will be 
merged.
+delayedDeliveryMaxNumBuckets=50
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3c00e905ac7..ff242888ae0 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -361,18 +361,20 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private long delayedDeliveryMinIndexCountPerBucket = 50000;
 
     @FieldContext(category = CATEGORY_SERVER, doc = """
-            The delayed message index bucket time step(in seconds) in per 
bucket snapshot segment, \
+            The delayed message index time step(in seconds) in per bucket 
snapshot segment, \
             after reaching the max time step limitation, the snapshot segment 
will be cut off.""")
-    private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 
300;
+    private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 
300;
+
+    @FieldContext(category = CATEGORY_SERVER, doc = """
+            The max number of delayed message index in per bucket snapshot 
segment, -1 means no limitation\
+            after reaching the max number limitation, the snapshot segment 
will be cut off.""")
+    private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;
 
     @FieldContext(category = CATEGORY_SERVER, doc = """
             The max number of delayed message index bucket, \
             after reaching the max buckets limitation, the adjacent buckets 
will be merged.""")
     private int delayedDeliveryMaxNumBuckets = 50;
 
-    @FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed 
message index across subscriptions")
-    private boolean delayedDeliverySharedIndexEnabled = false;
-
     @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead 
window to use "
             + "when detecting if all the messages in the topic have a fixed 
delay. "
             + "Default is 50,000. Setting the lookahead window to 0 will 
disable the "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index ae9cb23ceb9..f0feb8b27d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -43,7 +43,9 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
 
     private long delayedDeliveryMinIndexCountPerBucket;
 
-    private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+    private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+
+    private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;
 
     @Override
     public void initialize(PulsarService pulsarService) throws Exception {
@@ -58,6 +60,8 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
         this.delayedDeliveryMaxNumBuckets = 
config.getDelayedDeliveryMaxNumBuckets();
         this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
                 
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
+        this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
+                config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
     }
 
     @Override
@@ -65,7 +69,7 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
         return new BucketDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
                 bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
                 
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
-                delayedDeliveryMaxNumBuckets);
+                delayedDeliveryMaxIndexesPerBucketSnapshotSegment, 
delayedDeliveryMaxNumBuckets);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index a77b272297b..22689cd737b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -70,6 +70,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final long timeStepPerBucketSnapshotSegmentInMillis;
 
+    private final int maxIndexesPerBucketSnapshotSegment;
+
     private final int maxNumBuckets;
 
     private long numberDelayedMessages;
@@ -89,9 +91,10 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
                                  BucketSnapshotStorage bucketSnapshotStorage,
                                  long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
-                                 int maxNumBuckets) {
+                                 int maxIndexesPerBucketSnapshotSegment, int 
maxNumBuckets) {
         this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
-                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
+                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegmentInMillis,
+                maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
     }
 
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
@@ -99,10 +102,11 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
                                  BucketSnapshotStorage bucketSnapshotStorage,
                                  long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
-                                 int maxNumBuckets) {
+                                 int maxIndexesPerBucketSnapshotSegment, int 
maxNumBuckets) {
         super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.minIndexCountPerBucket = minIndexCountPerBucket;
         this.timeStepPerBucketSnapshotSegmentInMillis = 
timeStepPerBucketSnapshotSegmentInMillis;
+        this.maxIndexesPerBucketSnapshotSegment = 
maxIndexesPerBucketSnapshotSegment;
         this.maxNumBuckets = maxNumBuckets;
         this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
         this.immutableBuckets = TreeRangeMap.create();
@@ -292,7 +296,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 && lastMutableBucket.size() >= minIndexCountPerBucket
                 && !lastMutableBucket.isEmpty()) {
             Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
-                    
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
+                    lastMutableBucket.sealBucketAndAsyncPersistent(
+                            this.timeStepPerBucketSnapshotSegmentInMillis,
+                            this.maxIndexesPerBucketSnapshotSegment,
                             this.sharedBucketPriorityQueue);
             afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
             lastMutableBucket.resetLastMutableBucketRange();
@@ -380,8 +386,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 .thenAccept(combinedDelayedIndexQueue -> {
                     Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
                             
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
-                                    timeStepPerBucketSnapshotSegmentInMillis, 
sharedBucketPriorityQueue,
-                                    combinedDelayedIndexQueue, 
bucketA.startLedgerId, bucketB.endLedgerId);
+                                    timeStepPerBucketSnapshotSegmentInMillis, 
maxIndexesPerBucketSnapshotSegment,
+                                    sharedBucketPriorityQueue, 
combinedDelayedIndexQueue, bucketA.startLedgerId,
+                                    bucketB.endLedgerId);
 
                     // Merge bit map to new bucket
                     Map<Long, RoaringBitmap> delayedIndexBitMapA = 
bucketA.getDelayedIndexBitMap();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 40ba8f4c4b5..8187f8f2de1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -50,13 +50,15 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
 
     Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
             long timeStepPerBucketSnapshotSegment,
+            int maxIndexesPerBucketSnapshotSegment,
             TripleLongPriorityQueue sharedQueue) {
-        return 
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, 
sharedQueue,
+        return 
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
+                maxIndexesPerBucketSnapshotSegment, sharedQueue,
                 TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), 
startLedgerId, endLedgerId);
     }
 
     Pair<ImmutableBucket, DelayedIndex> 
createImmutableBucketAndAsyncPersistent(
-            final long timeStepPerBucketSnapshotSegment,
+            final long timeStepPerBucketSnapshotSegment, final int 
maxIndexesPerBucketSnapshotSegment,
             TripleLongPriorityQueue sharedQueue, DelayedIndexQueue 
delayedIndexQueue, final long startLedgerId,
             final long endLedgerId) {
         log.info("[{}] Creating bucket snapshot, startLedgerId: {}, 
endLedgerId: {}", dispatcherName,
@@ -98,7 +100,9 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
             snapshotSegmentBuilder.addIndexes(delayedIndex);
 
-            if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
+            if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
+                    || (maxIndexesPerBucketSnapshotSegment != -1
+                    && snapshotSegmentBuilder.getIndexesCount() >= 
maxIndexesPerBucketSnapshotSegment)) {
                 segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
                 currentTimestampUpperLimit = 0;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 0ba9e5f4ca2..378ad205bb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -85,7 +85,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         return switch (methodName) {
             case "test" -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
-                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
             }};
             case "testWithTimer" -> {
                 Timer timer = mock(Timer.class);
@@ -113,39 +113,43 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
                 yield new Object[][]{{
                         new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
-                                false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50),
+                                false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50),
                         tasks
                 }};
             }
             case "testAddWithinTickTime" -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
-                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                            false, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
             }};
             case "testAddMessageWithStrictDelay" -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
-                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> 
new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", 
"testRecoverSnapshot" ->
                     new Object[][]{{
                             new BucketDelayedDeliveryTracker(dispatcher, 
timer, 100000, clock,
-                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
                     }};
             case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", 
"testExistDelayedMessage" ->
                     new Object[][]{{
                             new BucketDelayedDeliveryTracker(dispatcher, 
timer, 500, clock,
-                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
+                                    true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
                     }};
             case "testMergeSnapshot", "testWithBkException", 
"testWithCreateFailDowngrade" -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
-                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 10)
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 10)
+            }};
+            case "testMaxIndexesPerSegment" -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
+                            true, bucketSnapshotStorage, 20, 
TimeUnit.HOURS.toMillis(1), 5, 100)
             }};
             default -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
-                            true, bucketSnapshotStorage, 1000, 
TimeUnit.MILLISECONDS.toMillis(100), 50)
+                            true, bucketSnapshotStorage, 1000, 
TimeUnit.MILLISECONDS.toMillis(100), -1, 50)
             }};
         };
     }
@@ -196,7 +200,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         clockTime.set(30 * 10);
 
         tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50);
+                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,50);
 
         assertFalse(tracker.containsMessage(101, 101));
         assertEquals(tracker.getNumberOfDelayedMessages(), 70);
@@ -268,7 +272,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         tracker.close();
 
         tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 10);
+                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
 
         assertEquals(tracker.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshot.getValue());
 
@@ -318,7 +322,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         tracker.close();
 
         tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 10);
+                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
 
         Long delayedMessagesInSnapshotValue = 
delayedMessagesInSnapshot.getValue();
         assertEquals(tracker.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshotValue);
@@ -374,4 +378,17 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
             assertEquals(position, PositionImpl.get(i, i));
         }
     }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker) 
{
+        for (int i = 1; i <= 101; i++) {
+            tracker.addMessage(i, i, i * 10);
+        }
+
+        assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 5);
+
+        tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
+            assertEquals(bucket.getLastSegmentEntryId(), 4);
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 5d81ba8bc02..fa846779b08 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -32,6 +32,7 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
         
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
         conf.setDelayedDeliveryMaxNumBuckets(10);
         conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+        conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
         conf.setDelayedDeliveryMinIndexCountPerBucket(50);
         conf.setManagedLedgerMaxEntriesPerLedger(50);
         conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);

Reply via email to