This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8081ee26d8d [improve][broker][PIP-195] Make bucket merge operation 
asynchronous (#19873)
8081ee26d8d is described below

commit 8081ee26d8d3727c720800a3453a798893763fee
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Mar 25 15:39:37 2023 +0800

    [improve][broker][PIP-195] Make bucket merge operation asynchronous (#19873)
---
 .../bucket/BucketDelayedDeliveryTracker.java       | 139 +++++++++++++--------
 .../broker/delayed/bucket/ImmutableBucket.java     |  17 ++-
 .../broker/delayed/bucket/MutableBucket.java       |   6 +
 .../DelayedMessageIndexBucketSnapshotFormat.proto  |   1 +
 .../BookkeeperBucketSnapshotStorageTest.java       |   3 +
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  57 +++++++--
 6 files changed, 154 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 31fdaa6fb76..1cf8bd2b20d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -90,6 +90,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final Table<Long, Long, ImmutableBucket> 
snapshotSegmentLastIndexTable;
 
+    private static final Long INVALID_BUCKET_ID = -1L;
+
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
                                  Timer timer, long tickTimeMillis,
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -246,12 +248,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     immutableBucket);
 
             immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture 
-> {
-                CompletableFuture<Long> future = 
createFuture.whenComplete((__, ex) -> {
+                CompletableFuture<Long> future = 
createFuture.handle((bucketId, ex) -> {
                     if (ex == null) {
                         immutableBucket.setSnapshotSegments(null);
                         log.info("[{}] Creat bucket snapshot finish, 
bucketKey: {}", dispatcher.getName(),
                                 immutableBucket.bucketKey());
-                        return;
+                        return bucketId;
                     }
 
                     //TODO Record create snapshot failed
@@ -277,6 +279,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
                                 lastDelayedIndex.getTimestamp());
                     }
+                    return INVALID_BUCKET_ID;
                 });
                 immutableBucket.setSnapshotCreateFuture(future);
             });
@@ -308,12 +311,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             lastMutableBucket.resetLastMutableBucketRange();
 
             if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
-                try {
-                    asyncMergeBucketSnapshot().get(2 * 
AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    // Ignore exception to merge bucket on the next schedule.
-                    log.error("[{}] An exception occurs when merge bucket 
snapshot.", dispatcher.getName(), e);
-                }
+                asyncMergeBucketSnapshot();
             }
         }
 
@@ -341,18 +339,26 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
     private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
         List<ImmutableBucket> values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
         long minNumberMessages = Long.MAX_VALUE;
+        long minScheduleTimestamp = Long.MAX_VALUE;
         int minIndex = -1;
         for (int i = 0; i + 1 < values.size(); i++) {
             ImmutableBucket bucketL = values.get(i);
             ImmutableBucket bucketR = values.get(i + 1);
-            long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
-            if (numberMessages < minNumberMessages) {
-                minNumberMessages = (int) numberMessages;
-                if (bucketL.lastSegmentEntryId > 
bucketL.getCurrentSegmentEntryId()
-                        && bucketR.lastSegmentEntryId > 
bucketR.getCurrentSegmentEntryId()
-                        && 
bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()
-                        && 
bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) {
-                    minIndex = i;
+            // We should skip the bucket which last segment already been load 
to memory, avoid record replicated index.
+            if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
+                    && bucketR.lastSegmentEntryId > 
bucketR.getCurrentSegmentEntryId()
+                    // Skip the bucket that is merging
+                    && !bucketL.merging && !bucketR.merging){
+                long scheduleTimestamp =
+                        
Math.min(bucketL.firstScheduleTimestamps.get(bucketL.currentSegmentEntryId + 1),
+                                
bucketR.firstScheduleTimestamps.get(bucketR.currentSegmentEntryId + 1));
+                long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+                if (scheduleTimestamp <= minScheduleTimestamp) {
+                    minScheduleTimestamp = scheduleTimestamp;
+                    if (numberMessages < minNumberMessages) {
+                        minNumberMessages = numberMessages;
+                        minIndex = i;
+                    }
                 }
             }
         }
@@ -369,7 +375,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             log.info("[{}] Merging bucket snapshot, bucketAKey: {}, 
bucketBKey: {}", dispatcher.getName(),
                     immutableBucketA.bucketKey(), 
immutableBucketB.bucketKey());
         }
+
+        immutableBucketA.merging = true;
+        immutableBucketB.merging = true;
         return asyncMergeBucketSnapshot(immutableBucketA, 
immutableBucketB).whenComplete((__, ex) -> {
+            synchronized (this) {
+                immutableBucketA.merging = false;
+                immutableBucketB.merging = false;
+            }
             if (ex != null) {
                 log.error("[{}] Failed to merge bucket snapshot, bucketAKey: 
{}, bucketBKey: {}",
                         dispatcher.getName(), immutableBucketA.bucketKey(), 
immutableBucketB.bucketKey(), ex);
@@ -382,46 +395,58 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
                                                                           
ImmutableBucket bucketB) {
-        
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureA =
-                bucketA.getRemainSnapshotSegment();
-        
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureB =
-                bucketB.getRemainSnapshotSegment();
-        return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
-                .thenAccept(combinedDelayedIndexQueue -> {
-                    Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
-                            
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
-                                    timeStepPerBucketSnapshotSegmentInMillis, 
maxIndexesPerBucketSnapshotSegment,
-                                    sharedBucketPriorityQueue, 
combinedDelayedIndexQueue, bucketA.startLedgerId,
-                                    bucketB.endLedgerId);
-
-                    // Merge bit map to new bucket
-                    Map<Long, RoaringBitmap> delayedIndexBitMapA = 
bucketA.getDelayedIndexBitMap();
-                    Map<Long, RoaringBitmap> delayedIndexBitMapB = 
bucketB.getDelayedIndexBitMap();
-                    Map<Long, RoaringBitmap> delayedIndexBitMap = new 
HashMap<>(delayedIndexBitMapA);
-                    delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
-                        delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
-                            if (bitMapA == null) {
-                                return bitMapB;
-                            }
+        CompletableFuture<Long> createAFuture = 
bucketA.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
+        CompletableFuture<Long> createBFuture = 
bucketB.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
 
-                            bitMapA.or(bitMapB);
-                            return bitMapA;
-                        });
-                    });
-                    
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
-
-                    
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+        return CompletableFuture.allOf(createAFuture, 
createBFuture).thenCompose(bucketId -> {
+            if (INVALID_BUCKET_ID.equals(createAFuture.join()) || 
INVALID_BUCKET_ID.equals(createBFuture.join())) {
+                return FutureUtil.failedFuture(new RuntimeException("Can't 
merge buckets due to bucket create failed"));
+            }
 
-                    
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
-                            .orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
-                        CompletableFuture<Void> removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
-                        CompletableFuture<Void> removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
-                        return CompletableFuture.allOf(removeAFuture, 
removeBFuture);
+            
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureA =
+                    bucketA.getRemainSnapshotSegment();
+            
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureB =
+                    bucketB.getRemainSnapshotSegment();
+            return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+                    .thenAccept(combinedDelayedIndexQueue -> {
+                        synchronized (BucketDelayedDeliveryTracker.this) {
+                            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                                    
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
+                                            
timeStepPerBucketSnapshotSegmentInMillis,
+                                            maxIndexesPerBucketSnapshotSegment,
+                                            sharedBucketPriorityQueue, 
combinedDelayedIndexQueue, bucketA.startLedgerId,
+                                            bucketB.endLedgerId);
+
+                            // Merge bit map to new bucket
+                            Map<Long, RoaringBitmap> delayedIndexBitMapA = 
bucketA.getDelayedIndexBitMap();
+                            Map<Long, RoaringBitmap> delayedIndexBitMapB = 
bucketB.getDelayedIndexBitMap();
+                            Map<Long, RoaringBitmap> delayedIndexBitMap = new 
HashMap<>(delayedIndexBitMapA);
+                            delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> 
{
+                                delayedIndexBitMap.compute(ledgerId, (k, 
bitMapA) -> {
+                                    if (bitMapA == null) {
+                                        return bitMapB;
+                                    }
+
+                                    bitMapA.or(bitMapB);
+                                    return bitMapA;
+                                });
+                            });
+                            
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
+
+                            
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+
+                            
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+                                    .orElse(NULL_LONG_PROMISE).thenCompose(___ 
-> {
+                                        CompletableFuture<Void> removeAFuture 
= bucketA.asyncDeleteBucketSnapshot();
+                                        CompletableFuture<Void> removeBFuture 
= bucketB.asyncDeleteBucketSnapshot();
+                                        return 
CompletableFuture.allOf(removeAFuture, removeBFuture);
+                                    });
+
+                            
immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+                            
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+                        }
                     });
-
-                    
immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
-                    
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
-                });
+        });
     }
 
     @Override
@@ -477,6 +502,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
             ImmutableBucket bucket = 
snapshotSegmentLastIndexTable.get(ledgerId, entryId);
             if (bucket != null && 
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+                if (bucket.merging) {
+                    log.info("[{}] Skip load to wait for bucket snapshot merge 
finish, bucketKey:{}",
+                            dispatcher.getName(), bucket.bucketKey());
+                    break;
+                }
+
                 final int preSegmentEntryId = bucket.currentSegmentEntryId;
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Loading next bucket snapshot segment, 
bucketKey: {}, nextSegmentEntryId: {}",
@@ -525,7 +556,6 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                     dispatcher.getName(), bucket.bucketKey(), 
bucket.currentSegmentEntryId);
                         }
                     }).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, 
TimeUnit.SECONDS);
-                    snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
                 } catch (Exception e) {
                     // Ignore exception to reload this segment on the next 
schedule.
                     log.error("[{}] An exception occurs when load next bucket 
snapshot, bucketKey:{}",
@@ -533,6 +563,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     break;
                 }
             }
+            snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
 
             positions.add(new PositionImpl(ledgerId, entryId));
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index c9223efa092..ab1c285011d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
 import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
 import com.google.protobuf.ByteString;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +45,12 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 class ImmutableBucket extends Bucket {
 
     @Setter
-    private volatile 
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
+    private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
snapshotSegments;
+
+    boolean merging = false;
+
+    @Setter
+    List<Long> firstScheduleTimestamps = new ArrayList<>();
 
     ImmutableBucket(String dispatcherName, ManagedCursor cursor,
                     BucketSnapshotStorage storage, long startLedgerId, long 
endLedgerId) {
@@ -92,6 +98,9 @@ class ImmutableBucket extends Bucket {
 
                         this.setLastSegmentEntryId(metadataList.size());
                         
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
+                        List<Long> firstScheduleTimestamps = 
metadataList.stream().map(
+                                        
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+                        
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
                         return nextSnapshotEntryIndex + 1;
                     });
@@ -157,8 +166,10 @@ class ImmutableBucket extends Bucket {
             return 
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), 
nextSegmentEntryId,
                     lastSegmentEntryId).whenComplete((__, ex) -> {
                 if (ex != null) {
-                    log.warn("[{}] Failed to get remain bucket snapshot 
segment, bucketKey: {}.",
-                            dispatcherName, bucketKey(), ex);
+                    log.warn(
+                            "[{}] Failed to get remain bucket snapshot 
segment, bucketKey: {},"
+                                    + " nextSegmentEntryId: {}, 
lastSegmentEntryId: {}",
+                            dispatcherName, bucketKey(), nextSegmentEntryId, 
lastSegmentEntryId, ex);
                 }
             });
         }, BucketSnapshotPersistenceException.class, MaxRetryTimes);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index e743f39e692..1577bf8fa51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -75,11 +75,15 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
         SnapshotSegment.Builder snapshotSegmentBuilder = 
SnapshotSegment.newBuilder();
         SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
 
+        List<Long> firstScheduleTimestamps = new ArrayList<>();
         long currentTimestampUpperLimit = 0;
+        long currentFirstTimestamp = 0L;
         while (!delayedIndexQueue.isEmpty()) {
             DelayedIndex delayedIndex = delayedIndexQueue.peek();
             long timestamp = delayedIndex.getTimestamp();
             if (currentTimestampUpperLimit == 0) {
+                currentFirstTimestamp = timestamp;
+                firstScheduleTimestamps.add(currentFirstTimestamp);
                 currentTimestampUpperLimit = timestamp + 
timeStepPerBucketSnapshotSegment - 1;
             }
 
@@ -104,6 +108,7 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                     || (maxIndexesPerBucketSnapshotSegment != -1
                     && snapshotSegmentBuilder.getIndexesCount() >= 
maxIndexesPerBucketSnapshotSegment)) {
                 segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
+                
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
                 currentTimestampUpperLimit = 0;
 
                 Iterator<Map.Entry<Long, RoaringBitmap>> iterator = 
bitMap.entrySet().iterator();
@@ -134,6 +139,7 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
         bucket.setCurrentSegmentEntryId(1);
         bucket.setNumberBucketDelayedMessages(numMessages);
         bucket.setLastSegmentEntryId(lastSegmentEntryId);
+        bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
         // Skip first segment, because it has already been loaded
         List<SnapshotSegment> snapshotSegments = 
bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto 
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index 8414a583fe5..6996b860c52 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -31,6 +31,7 @@ message DelayedIndex {
 message SnapshotSegmentMetadata {
     map<uint64, bytes> delayed_index_bit_map = 1;
     required uint64 max_schedule_timestamp = 2;
+    required uint64 min_schedule_timestamp = 3;
 }
 
 message SnapshotSegment {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 72052d22b85..a628b58e10d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -73,6 +73,7 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
     public void testGetSnapshot() throws ExecutionException, 
InterruptedException {
         DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
                 
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+                        .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
 
@@ -122,6 +123,7 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
                 
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
                         .setMaxScheduleTimestamp(timeMillis)
+                        .setMinScheduleTimestamp(timeMillis)
                         .putAllDelayedIndexBitMap(map).build();
 
         DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
@@ -172,6 +174,7 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
     public void testGetBucketSnapshotLength() throws ExecutionException, 
InterruptedException {
         DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
                 
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+                        .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 74101f00b96..0d53c278fd2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -52,6 +52,7 @@ import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableLong;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -251,18 +252,28 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
     }
 
     @Test(dataProvider = "delayedTracker")
-    public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
+    public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {
         for (int i = 1; i <= 110; i++) {
             tracker.addMessage(i, i, i * 10);
+            Awaitility.await().untilAsserted(() -> {
+                Assert.assertTrue(
+                        
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> 
x.merging));
+            });
         }
 
         assertEquals(110, tracker.getNumberOfDelayedMessages());
 
         int size = tracker.getImmutableBuckets().asMapOfRanges().size();
 
-        assertEquals(10, size);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(10, size);
+        });
 
         tracker.addMessage(111, 1011, 111 * 10);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(
+                    
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> 
x.merging));
+        });
 
         MutableLong delayedMessagesInSnapshot = new MutableLong();
         tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> {
@@ -271,26 +282,28 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         tracker.close();
 
-        tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
-                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
+        BucketDelayedDeliveryTracker tracker2 = new 
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
+                true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, 10);
 
-        assertEquals(tracker.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshot.getValue());
+        assertEquals(tracker2.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshot.getValue());
 
         for (int i = 1; i <= 110; i++) {
-            tracker.addMessage(i, i, i * 10);
+            tracker2.addMessage(i, i, i * 10);
         }
 
         clockTime.set(110 * 10);
 
-        NavigableSet<PositionImpl> scheduledMessages = 
tracker.getScheduledMessages(110);
+        NavigableSet<PositionImpl> scheduledMessages = 
tracker2.getScheduledMessages(110);
         for (int i = 1; i <= 110; i++) {
             PositionImpl position = scheduledMessages.pollFirst();
             assertEquals(position, PositionImpl.get(i, i));
         }
+
+        tracker2.close();
     }
 
     @Test(dataProvider = "delayedTracker")
-    public void testWithBkException(BucketDelayedDeliveryTracker tracker) {
+    public void testWithBkException(final BucketDelayedDeliveryTracker 
tracker) {
         MockBucketSnapshotStorage mockBucketSnapshotStorage = 
(MockBucketSnapshotStorage) bucketSnapshotStorage;
         mockBucketSnapshotStorage.injectCreateException(
                 new BucketSnapshotPersistenceException("Bookie operation 
timeout, op: Create entry"));
@@ -308,11 +321,25 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         for (int i = 1; i <= 110; i++) {
             tracker.addMessage(i, i, i * 10);
+            Awaitility.await().untilAsserted(() -> {
+                Assert.assertTrue(
+                        
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> 
x.merging));
+            });
         }
 
         assertEquals(110, tracker.getNumberOfDelayedMessages());
 
+        int size = tracker.getImmutableBuckets().asMapOfRanges().size();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(10, size);
+        });
+
         tracker.addMessage(111, 1011, 111 * 10);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(
+                    
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> 
x.merging));
+        });
 
         MutableLong delayedMessagesInSnapshot = new MutableLong();
         tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> {
@@ -321,11 +348,11 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         tracker.close();
 
-        tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, 
clock,
+        BucketDelayedDeliveryTracker tracker2 = new 
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
                 true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
 
         Long delayedMessagesInSnapshotValue = 
delayedMessagesInSnapshot.getValue();
-        assertEquals(tracker.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshotValue);
+        assertEquals(tracker2.getNumberOfDelayedMessages(), 
delayedMessagesInSnapshotValue);
 
         clockTime.set(110 * 10);
 
@@ -338,14 +365,16 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         mockBucketSnapshotStorage.injectGetSegmentException(
                 new BucketSnapshotPersistenceException("Bookie operation 
timeout4, op: Get entry"));
 
-        assertEquals(tracker.getScheduledMessages(100).size(), 0);
+        assertEquals(tracker2.getScheduledMessages(100).size(), 0);
 
-        assertEquals(tracker.getScheduledMessages(100).size(), 
delayedMessagesInSnapshotValue);
+        assertEquals(tracker2.getScheduledMessages(100).size(), 
delayedMessagesInSnapshotValue);
 
         assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
         
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
         
assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty());
         assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty());
+
+        tracker2.close();
     }
 
     @Test(dataProvider = "delayedTracker")
@@ -390,6 +419,8 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
             assertEquals(bucket.getLastSegmentEntryId(), 4);
         });
+
+        tracker.close();
     }
     
     @Test(dataProvider = "delayedTracker")
@@ -408,5 +439,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
       assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
       assertEquals(tracker.getLastMutableBucket().size(), 0);
       assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0);
+
+      tracker.close();
     }
 }

Reply via email to