This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dad5791bdd [fix][broker] Fix BucketDelayedDeliveryTracker merge 
issues (#19615)
7dad5791bdd is described below

commit 7dad5791bddd25bbfc4b154056ac723bb5d64ede
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Mar 1 09:13:04 2023 +0800

    [fix][broker] Fix BucketDelayedDeliveryTracker merge issues (#19615)
---
 .../BucketDelayedDeliveryTrackerFactory.java       |  3 +-
 .../bucket/BucketDelayedDeliveryTracker.java       | 64 ++++++++++++++++------
 .../broker/delayed/bucket/ImmutableBucket.java     | 22 ++++++--
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  9 +++
 .../persistent/BucketDelayedDeliveryTest.java      |  5 ++
 5 files changed, 80 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 16648d84e9f..ae9cb23ceb9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -49,6 +49,7 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
     public void initialize(PulsarService pulsarService) throws Exception {
         ServiceConfiguration config = pulsarService.getConfig();
         bucketSnapshotStorage = new 
BookkeeperBucketSnapshotStorage(pulsarService);
+        bucketSnapshotStorage.start();
         this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), 
TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
@@ -63,7 +64,7 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
     public DelayedDeliveryTracker 
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
         return new BucketDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
                 bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
-                delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds,
+                
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
                 delayedDeliveryMaxNumBuckets);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 77c1dfb1eea..bd4ef92cc73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -55,6 +55,7 @@ import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.RoaringBitmap;
 
 @Slf4j
 @ThreadSafe
@@ -64,7 +65,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final long minIndexCountPerBucket;
 
-    private final long timeStepPerBucketSnapshotSegment;
+    private final long timeStepPerBucketSnapshotSegmentInMillis;
 
     private final int maxNumBuckets;
 
@@ -84,21 +85,21 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                  Timer timer, long tickTimeMillis,
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
                                  BucketSnapshotStorage bucketSnapshotStorage,
-                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegment,
+                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
                                  int maxNumBuckets) {
         this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
-                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegment, maxNumBuckets);
+                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
     }
 
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
                                  Timer timer, long tickTimeMillis, Clock clock,
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
                                  BucketSnapshotStorage bucketSnapshotStorage,
-                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegment,
+                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
                                  int maxNumBuckets) {
         super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.minIndexCountPerBucket = minIndexCountPerBucket;
-        this.timeStepPerBucketSnapshotSegment = 
timeStepPerBucketSnapshotSegment;
+        this.timeStepPerBucketSnapshotSegmentInMillis = 
timeStepPerBucketSnapshotSegmentInMillis;
         this.maxNumBuckets = maxNumBuckets;
         this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
         this.immutableBuckets = TreeRangeMap.create();
@@ -255,7 +256,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 && lastMutableBucket.size() >= minIndexCountPerBucket
                 && !lastMutableBucket.isEmpty()) {
             Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
-                    
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+                    
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
                             this.sharedBucketPriorityQueue);
             afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
             lastMutableBucket.resetLastMutableBucketRange();
@@ -303,17 +304,21 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
             if (numberMessages < minNumberMessages) {
                 minNumberMessages = (int) numberMessages;
-                minIndex = i;
+                if (bucketL.lastSegmentEntryId > 
bucketL.getCurrentSegmentEntryId()) {
+                    minIndex = i;
+                }
             }
         }
+
+        if (minIndex == -1) {
+            log.warn("[{}] Can't find able merged bucket", 
dispatcher.getName());
+            return CompletableFuture.completedFuture(null);
+        }
         return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
     }
 
     private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
                                                                           
ImmutableBucket bucketB) {
-        immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
-        immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
-
         CompletableFuture<Long> snapshotCreateFutureA =
                 
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
         CompletableFuture<Long> snapshotCreateFutureB =
@@ -328,16 +333,41 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     .thenAccept(combinedDelayedIndexQueue -> {
                         Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
                                 
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
-                                        timeStepPerBucketSnapshotSegment, 
sharedBucketPriorityQueue,
+                                        
timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue,
                                         combinedDelayedIndexQueue, 
bucketA.startLedgerId, bucketB.endLedgerId);
+
+                        // Merge bit map to new bucket
+                        Map<Long, RoaringBitmap> delayedIndexBitMapA = 
bucketA.getDelayedIndexBitMap();
+                        Map<Long, RoaringBitmap> delayedIndexBitMapB = 
bucketB.getDelayedIndexBitMap();
+                        Map<Long, RoaringBitmap> delayedIndexBitMap = new 
HashMap<>(delayedIndexBitMapA);
+                        delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
+                            delayedIndexBitMap.compute(ledgerId, (k, bitMapA) 
-> {
+                                if (bitMapA == null) {
+                                    return bitMapB;
+                                }
+
+                                bitMapA.or(bitMapB);
+                                return bitMapA;
+                            });
+                        });
+                        
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
+
                         
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
 
-                        
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
-                                
.orElse(CompletableFuture.completedFuture(null)).thenCompose(___ -> {
-                                    CompletableFuture<Void> removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
-                                    CompletableFuture<Void> removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
-                                    return 
CompletableFuture.allOf(removeAFuture, removeBFuture);
-                                });
+                        CompletableFuture<Long> snapshotCreateFuture = 
CompletableFuture.completedFuture(null);
+                        if (immutableBucketDelayedIndexPair != null) {
+                            snapshotCreateFuture = 
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+                                    
.orElse(CompletableFuture.completedFuture(null));
+                        }
+
+                        snapshotCreateFuture.thenCompose(___ -> {
+                            CompletableFuture<Void> removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+                            CompletableFuture<Void> removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+                            return CompletableFuture.allOf(removeAFuture, 
removeBFuture);
+                        });
+
+                        
immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+                        
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
                     });
         });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 8348b4999ed..3e9c577454f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -24,9 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -134,7 +132,11 @@ class ImmutableBucket extends Bucket {
     }
 
     
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 getRemainSnapshotSegment() {
-        return 
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), 
currentSegmentEntryId,
+        int nextSegmentEntryId = currentSegmentEntryId + 1;
+        if (nextSegmentEntryId > lastSegmentEntryId) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+        return 
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), 
nextSegmentEntryId,
                 lastSegmentEntryId);
     }
 
@@ -155,9 +157,19 @@ class ImmutableBucket extends Bucket {
         getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
             if (delete) {
                 snapshotGenerateFuture.cancel(true);
+                String bucketKey = bucketKey();
+                long bucketId = getAndUpdateBucketId();
                 try {
-                    
asyncDeleteBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
-                } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                    // Because bucketSnapshotStorage.deleteBucketSnapshot may 
be use the same thread with clear,
+                    // so we can't block deleteBucketSnapshot when clearing 
the bucket snapshot.
+                    removeBucketCursorProperty(bucketKey())
+                            .thenApply(__ -> 
bucketSnapshotStorage.deleteBucketSnapshot(bucketId).exceptionally(ex -> {
+                                log.error("Failed to delete bucket snapshot, 
bucketId: {}, bucketKey: {}",
+                                        bucketId, bucketKey, ex);
+                                return null;
+                            })).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.error("Failed to delete bucket snapshot, bucketId: {}, 
bucketKey: {}", bucketId, bucketKey, e);
                     if (e instanceof InterruptedException) {
                         Thread.currentThread().interrupt();
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 0a2a76ec339..920f2cf2b64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -36,6 +36,7 @@ import java.time.Clock;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -253,5 +254,13 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         int size = tracker.getImmutableBuckets().asMapOfRanges().size();
 
         assertEquals(10, size);
+
+        clockTime.set(110 * 10);
+
+        NavigableSet<PositionImpl> scheduledMessages = 
tracker.getScheduledMessages(110);
+        for (int i = 1; i <= 110; i++) {
+            PositionImpl position = scheduledMessages.pollFirst();
+            assertEquals(position, PositionImpl.get(i, i));
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index a54c0ce794c..5d81ba8bc02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -30,6 +30,11 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
     @Override
     public void setup() throws Exception {
         
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
+        conf.setDelayedDeliveryMaxNumBuckets(10);
+        conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+        conf.setDelayedDeliveryMinIndexCountPerBucket(50);
+        conf.setManagedLedgerMaxEntriesPerLedger(50);
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         super.setup();
     }
 

Reply via email to