This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7dad5791bdd [fix][broker] Fix BucketDelayedDeliveryTracker merge
issues (#19615)
7dad5791bdd is described below
commit 7dad5791bddd25bbfc4b154056ac723bb5d64ede
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Mar 1 09:13:04 2023 +0800
[fix][broker] Fix BucketDelayedDeliveryTracker merge issues (#19615)
---
.../BucketDelayedDeliveryTrackerFactory.java | 3 +-
.../bucket/BucketDelayedDeliveryTracker.java | 64 ++++++++++++++++------
.../broker/delayed/bucket/ImmutableBucket.java | 22 ++++++--
.../bucket/BucketDelayedDeliveryTrackerTest.java | 9 +++
.../persistent/BucketDelayedDeliveryTest.java | 5 ++
5 files changed, 80 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 16648d84e9f..ae9cb23ceb9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -49,6 +49,7 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
public void initialize(PulsarService pulsarService) throws Exception {
ServiceConfiguration config = pulsarService.getConfig();
bucketSnapshotStorage = new
BookkeeperBucketSnapshotStorage(pulsarService);
+ bucketSnapshotStorage.start();
this.timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(),
TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
@@ -63,7 +64,7 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
public DelayedDeliveryTracker
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
- delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds,
+
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxNumBuckets);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 77c1dfb1eea..bd4ef92cc73 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -55,6 +55,7 @@ import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.RoaringBitmap;
@Slf4j
@ThreadSafe
@@ -64,7 +65,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final long minIndexCountPerBucket;
- private final long timeStepPerBucketSnapshotSegment;
+ private final long timeStepPerBucketSnapshotSegmentInMillis;
private final int maxNumBuckets;
@@ -84,21 +85,21 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegment,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
- bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegment, maxNumBuckets);
+ bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
}
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegment,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
- this.timeStepPerBucketSnapshotSegment =
timeStepPerBucketSnapshotSegment;
+ this.timeStepPerBucketSnapshotSegmentInMillis =
timeStepPerBucketSnapshotSegmentInMillis;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
@@ -255,7 +256,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
-
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
@@ -303,17 +304,21 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
long numberMessages = bucketL.numberBucketDelayedMessages +
bucketR.numberBucketDelayedMessages;
if (numberMessages < minNumberMessages) {
minNumberMessages = (int) numberMessages;
- minIndex = i;
+ if (bucketL.lastSegmentEntryId >
bucketL.getCurrentSegmentEntryId()) {
+ minIndex = i;
+ }
}
}
+
+ if (minIndex == -1) {
+ log.warn("[{}] Can't find able merged bucket",
dispatcher.getName());
+ return CompletableFuture.completedFuture(null);
+ }
return asyncMergeBucketSnapshot(values.get(minIndex),
values.get(minIndex + 1));
}
private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
ImmutableBucket bucketB) {
- immutableBuckets.remove(Range.closed(bucketA.startLedgerId,
bucketA.endLedgerId));
- immutableBuckets.remove(Range.closed(bucketB.startLedgerId,
bucketB.endLedgerId));
-
CompletableFuture<Long> snapshotCreateFutureA =
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
CompletableFuture<Long> snapshotCreateFutureB =
@@ -328,16 +333,41 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
- timeStepPerBucketSnapshotSegment,
sharedBucketPriorityQueue,
+
timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue,
combinedDelayedIndexQueue,
bucketA.startLedgerId, bucketB.endLedgerId);
+
+ // Merge bit map to new bucket
+ Map<Long, RoaringBitmap> delayedIndexBitMapA =
bucketA.getDelayedIndexBitMap();
+ Map<Long, RoaringBitmap> delayedIndexBitMapB =
bucketB.getDelayedIndexBitMap();
+ Map<Long, RoaringBitmap> delayedIndexBitMap = new
HashMap<>(delayedIndexBitMapA);
+ delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
+ delayedIndexBitMap.compute(ledgerId, (k, bitMapA)
-> {
+ if (bitMapA == null) {
+ return bitMapB;
+ }
+
+ bitMapA.or(bitMapB);
+ return bitMapA;
+ });
+ });
+
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
-
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
-
.orElse(CompletableFuture.completedFuture(null)).thenCompose(___ -> {
- CompletableFuture<Void> removeAFuture =
bucketA.asyncDeleteBucketSnapshot();
- CompletableFuture<Void> removeBFuture =
bucketB.asyncDeleteBucketSnapshot();
- return
CompletableFuture.allOf(removeAFuture, removeBFuture);
- });
+ CompletableFuture<Long> snapshotCreateFuture =
CompletableFuture.completedFuture(null);
+ if (immutableBucketDelayedIndexPair != null) {
+ snapshotCreateFuture =
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+
.orElse(CompletableFuture.completedFuture(null));
+ }
+
+ snapshotCreateFuture.thenCompose(___ -> {
+ CompletableFuture<Void> removeAFuture =
bucketA.asyncDeleteBucketSnapshot();
+ CompletableFuture<Void> removeBFuture =
bucketB.asyncDeleteBucketSnapshot();
+ return CompletableFuture.allOf(removeAFuture,
removeBFuture);
+ });
+
+
immutableBuckets.remove(Range.closed(bucketA.startLedgerId,
bucketA.endLedgerId));
+
immutableBuckets.remove(Range.closed(bucketB.startLedgerId,
bucketB.endLedgerId));
});
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 8348b4999ed..3e9c577454f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -24,9 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -134,7 +132,11 @@ class ImmutableBucket extends Bucket {
}
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
getRemainSnapshotSegment() {
- return
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(),
currentSegmentEntryId,
+ int nextSegmentEntryId = currentSegmentEntryId + 1;
+ if (nextSegmentEntryId > lastSegmentEntryId) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ return
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(),
nextSegmentEntryId,
lastSegmentEntryId);
}
@@ -155,9 +157,19 @@ class ImmutableBucket extends Bucket {
getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
if (delete) {
snapshotGenerateFuture.cancel(true);
+ String bucketKey = bucketKey();
+ long bucketId = getAndUpdateBucketId();
try {
-
asyncDeleteBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ // Because bucketSnapshotStorage.deleteBucketSnapshot may
be use the same thread with clear,
+ // so we can't block deleteBucketSnapshot when clearing
the bucket snapshot.
+ removeBucketCursorProperty(bucketKey())
+ .thenApply(__ ->
bucketSnapshotStorage.deleteBucketSnapshot(bucketId).exceptionally(ex -> {
+ log.error("Failed to delete bucket snapshot,
bucketId: {}, bucketKey: {}",
+ bucketId, bucketKey, ex);
+ return null;
+ })).get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to delete bucket snapshot, bucketId: {},
bucketKey: {}", bucketId, bucketKey, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 0a2a76ec339..920f2cf2b64 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -36,6 +36,7 @@ import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -253,5 +254,13 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
int size = tracker.getImmutableBuckets().asMapOfRanges().size();
assertEquals(10, size);
+
+ clockTime.set(110 * 10);
+
+ NavigableSet<PositionImpl> scheduledMessages =
tracker.getScheduledMessages(110);
+ for (int i = 1; i <= 110; i++) {
+ PositionImpl position = scheduledMessages.pollFirst();
+ assertEquals(position, PositionImpl.get(i, i));
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index a54c0ce794c..5d81ba8bc02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -30,6 +30,11 @@ public class BucketDelayedDeliveryTest extends
DelayedDeliveryTest {
@Override
public void setup() throws Exception {
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
+ conf.setDelayedDeliveryMaxNumBuckets(10);
+ conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+ conf.setDelayedDeliveryMinIndexCountPerBucket(50);
+ conf.setManagedLedgerMaxEntriesPerLedger(50);
+ conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
super.setup();
}