This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9dc490fbeaf [feat][broker][PIP-195] Implement delayed message index
bucket snapshot(merge/delete) - part8 (#19138)
9dc490fbeaf is described below
commit 9dc490fbeaf7aa27265bd0625cb9fab026bbd604
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Jan 17 10:18:53 2023 +0800
[feat][broker][PIP-195] Implement delayed message index bucket
snapshot(merge/delete) - part8 (#19138)
---
.../pulsar/broker/delayed/bucket/Bucket.java | 5 +
.../bucket/BucketDelayedDeliveryTracker.java | 116 +++++++++++++++--
.../bucket/CombinedSegmentDelayedIndexQueue.java | 109 ++++++++++++++++
.../broker/delayed/bucket/DelayedIndexQueue.java | 41 ++++++
.../broker/delayed/bucket/ImmutableBucket.java | 29 ++++-
.../broker/delayed/bucket/MutableBucket.java | 35 +++--
.../TripleLongPriorityDelayedIndexQueue.java | 57 ++++++++
.../BucketDelayedDeliveryTrackerTest.java | 25 +++-
.../delayed/bucket/DelayedIndexQueueTest.java | 143 +++++++++++++++++++++
9 files changed, 526 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index c094d1dee7b..5d2a556337a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -143,4 +143,9 @@ abstract class Bucket {
return executeWithRetry(() -> cursor.putCursorProperty(bucketKey,
String.valueOf(bucketId)),
ManagedLedgerException.BadVersionException.class,
MaxRetryTimes);
}
+
+ protected CompletableFuture<Void> removeBucketCursorProperty(String
bucketKey) {
+ return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
+ ManagedLedgerException.BadVersionException.class,
MaxRetryTimes);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b402b51ce07..715123487d5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.delayed.bucket;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
@@ -32,14 +33,17 @@ import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.ThreadSafe;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -71,6 +75,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final TripleLongPriorityQueue sharedBucketPriorityQueue;
+ @Getter
+ @VisibleForTesting
private final RangeMap<Long, ImmutableBucket> immutableBuckets;
private final Table<Long, Long, ImmutableBucket>
snapshotSegmentLastIndexTable;
@@ -105,6 +111,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.cursor;
+ Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new
ConcurrentHashMap<>();
cursor.getCursorProperties().keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
@@ -112,8 +119,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
ImmutableBucket immutableBucket =
new ImmutableBucket(cursor,
this.lastMutableBucket.bucketSnapshotStorage,
Long.parseLong(keys[1]),
Long.parseLong(keys[2]));
-
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId,
immutableBucket.endLedgerId),
- immutableBucket);
+
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId,
immutableBucket.endLedgerId),
+ immutableBucket, toBeDeletedBucketMap);
}
});
@@ -122,10 +129,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
List<CompletableFuture<Void>> futures = new
ArrayList<>(immutableBuckets.asMapOfRanges().size());
- for (ImmutableBucket immutableBucket :
immutableBuckets.asMapOfRanges().values()) {
+ for (Map.Entry<Range<Long>, ImmutableBucket> entry
:immutableBuckets.asMapOfRanges().entrySet()) {
+ Range<Long> key = entry.getKey();
+ ImmutableBucket immutableBucket = entry.getValue();
CompletableFuture<Void> future =
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime).thenAccept(indexList
-> {
if (CollectionUtils.isEmpty(indexList)) {
+ // Delete bucket snapshot if indexList is empty
+ toBeDeletedBucketMap.put(key, immutableBucket);
return;
}
DelayedIndex lastDelayedIndex =
indexList.get(indexList.size() - 1);
@@ -144,7 +155,12 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
try {
- FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+ FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
+ toBeDeletedBucketMap.forEach((k, immutableBucket) -> {
+ immutableBuckets.asMapOfRanges().remove(k);
+ immutableBucket.asyncDeleteBucketSnapshot();
+ });
+ }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
throw new RuntimeException(e);
}
@@ -160,6 +176,26 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return numberDelayedMessages.getValue();
}
+ private synchronized void putAndCleanOverlapRange(Range<Long> range,
ImmutableBucket immutableBucket,
+ Map<Range<Long>,
ImmutableBucket> toBeDeletedBucketMap) {
+ RangeMap<Long, ImmutableBucket> subRangeMap =
immutableBuckets.subRangeMap(range);
+ boolean canPut = false;
+ if (!subRangeMap.asMapOfRanges().isEmpty()) {
+ for (Map.Entry<Range<Long>, ImmutableBucket> rangeEntry :
subRangeMap.asMapOfRanges().entrySet()) {
+ if (range.encloses(rangeEntry.getKey())) {
+ toBeDeletedBucketMap.put(rangeEntry.getKey(),
rangeEntry.getValue());
+ canPut = true;
+ }
+ }
+ } else {
+ canPut = true;
+ }
+
+ if (canPut) {
+ immutableBuckets.put(range, immutableBucket);
+ }
+ }
+
@Override
public void run(Timeout timeout) throws Exception {
synchronized (this) {
@@ -179,10 +215,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}
- private void sealBucket() {
- Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
-
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
- this.sharedBucketPriorityQueue);
+ private void afterCreateImmutableBucket(Pair<ImmutableBucket,
DelayedIndex> immutableBucketDelayedIndexPair) {
if (immutableBucketDelayedIndexPair != null) {
ImmutableBucket immutableBucket =
immutableBucketDelayedIndexPair.getLeft();
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId,
immutableBucket.endLedgerId),
@@ -214,11 +247,21 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
- sealBucket();
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+ this.sharedBucketPriorityQueue);
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
- // TODO merge bucket snapshot (synchronize operate)
+ try {
+
asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(e);
+ }
}
}
@@ -243,6 +286,55 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return true;
}
+ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
+ List<ImmutableBucket> values =
immutableBuckets.asMapOfRanges().values().stream().toList();
+ long minNumberMessages = Long.MAX_VALUE;
+ int minIndex = -1;
+ for (int i = 0; i + 1 < values.size(); i++) {
+ ImmutableBucket bucketL = values.get(i);
+ ImmutableBucket bucketR = values.get(i + 1);
+ long numberMessages = bucketL.numberBucketDelayedMessages +
bucketR.numberBucketDelayedMessages;
+ if (numberMessages < minNumberMessages) {
+ minNumberMessages = (int) numberMessages;
+ minIndex = i;
+ }
+ }
+ return asyncMergeBucketSnapshot(values.get(minIndex),
values.get(minIndex + 1));
+ }
+
+ private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+
ImmutableBucket bucketB) {
+ immutableBuckets.remove(Range.closed(bucketA.startLedgerId,
bucketA.endLedgerId));
+ immutableBuckets.remove(Range.closed(bucketB.startLedgerId,
bucketB.endLedgerId));
+
+ CompletableFuture<Long> snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+ CompletableFuture<Long> snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+ return CompletableFuture.allOf(snapshotCreateFutureA,
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureA =
+ bucketA.getRemainSnapshotSegment();
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureB =
+ bucketB.getRemainSnapshotSegment();
+ return futureA.thenCombine(futureB,
CombinedSegmentDelayedIndexQueue::wrap)
+ .thenAccept(combinedDelayedIndexQueue -> {
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
+ timeStepPerBucketSnapshotSegment,
sharedBucketPriorityQueue,
+ combinedDelayedIndexQueue,
bucketA.startLedgerId, bucketB.endLedgerId);
+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+
+
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+
.orElse(CompletableFuture.completedFuture(null)).thenCompose(___ -> {
+ CompletableFuture<Void> removeAFuture =
bucketA.asyncDeleteBucketSnapshot();
+ CompletableFuture<Void> removeBFuture =
bucketB.asyncDeleteBucketSnapshot();
+ return
CompletableFuture.allOf(removeAFuture, removeBFuture);
+ });
+ });
+ });
+ }
+
@Override
public synchronized boolean hasMessageAvailable() {
long cutoffTime = getCutoffTime();
@@ -299,7 +391,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
removeIndexBit(ledgerId, entryId);
ImmutableBucket bucket =
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
- if (bucket != null) {
+ if (bucket != null &&
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Load next snapshot segment, bucket: {}",
dispatcher.getName(), bucket);
}
@@ -308,6 +400,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
try {
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
+
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+ bucket.asyncDeleteBucketSnapshot();
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
new file mode 100644
index 00000000000..3f89cc9fdfb
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
+
+ private final List<SnapshotSegment> segmentListA;
+ private final List<SnapshotSegment> segmentListB;
+
+ private int segmentListACursor = 0;
+ private int segmentListBCursor = 0;
+ private int segmentACursor = 0;
+ private int segmentBCursor = 0;
+
+ private CombinedSegmentDelayedIndexQueue(List<SnapshotSegment>
segmentListA,
+ List<SnapshotSegment>
segmentListB) {
+ this.segmentListA = segmentListA;
+ this.segmentListB = segmentListB;
+ }
+
+ public static CombinedSegmentDelayedIndexQueue wrap(
+ List<SnapshotSegment> segmentListA,
+ List<SnapshotSegment> segmentListB) {
+ return new CombinedSegmentDelayedIndexQueue(segmentListA,
segmentListB);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return segmentListACursor >= segmentListA.size() && segmentListBCursor
>= segmentListB.size();
+ }
+
+ @Override
+ public DelayedIndex peek() {
+ return getValue(false);
+ }
+
+ @Override
+ public DelayedIndex pop() {
+ return getValue(true);
+ }
+
+ private DelayedIndex getValue(boolean needAdvanceCursor) {
+ // skip empty segment
+ while (segmentListACursor < segmentListA.size()
+ && segmentListA.get(segmentListACursor).getIndexesCount() ==
0) {
+ segmentListACursor++;
+ }
+ while (segmentListBCursor < segmentListB.size()
+ && segmentListB.get(segmentListBCursor).getIndexesCount() ==
0) {
+ segmentListBCursor++;
+ }
+
+ DelayedIndex delayedIndexA = null;
+ DelayedIndex delayedIndexB = null;
+ if (segmentListACursor >= segmentListA.size()) {
+ delayedIndexB =
segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
+ } else if (segmentListBCursor >= segmentListB.size()) {
+ delayedIndexA =
segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
+ } else {
+ delayedIndexA =
segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
+ delayedIndexB =
segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
+ }
+
+ DelayedIndex resultValue;
+ if (delayedIndexB == null || (delayedIndexA != null &&
COMPARATOR.compare(delayedIndexA, delayedIndexB) < 0)) {
+ resultValue = delayedIndexA;
+ if (needAdvanceCursor) {
+ if (++segmentACursor >=
segmentListA.get(segmentListACursor).getIndexesCount()) {
+ segmentListA.set(segmentListACursor, null);
+ ++segmentListACursor;
+ segmentACursor = 0;
+ }
+ }
+ } else {
+ resultValue = delayedIndexB;
+ if (needAdvanceCursor) {
+ if (++segmentBCursor >=
segmentListB.get(segmentListBCursor).getIndexesCount()) {
+ segmentListB.set(segmentListBCursor, null);
+ ++segmentListBCursor;
+ segmentBCursor = 0;
+ }
+ }
+ }
+
+ return resultValue;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
new file mode 100644
index 00000000000..dee476c376e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.Comparator;
+import java.util.Objects;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+
+interface DelayedIndexQueue {
+ Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex>
COMPARATOR = (o1, o2) -> {
+ if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
+ return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+ } else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
+ return Long.compare(o1.getLedgerId(), o2.getLedgerId());
+ } else {
+ return Long.compare(o1.getEntryId(), o2.getEntryId());
+ }
+ };
+
+ boolean isEmpty();
+
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
+
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 913893b1753..8348b4999ed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -24,7 +24,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -90,7 +92,6 @@ class ImmutableBucket extends Bucket {
return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
if (nextSegmentEntryId > lastSegmentEntryId) {
- // TODO Delete bucket snapshot
return CompletableFuture.completedFuture(null);
}
@@ -132,12 +133,36 @@ class ImmutableBucket extends Bucket {
this.setNumberBucketDelayedMessages(numberMessages.getValue());
}
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
getRemainSnapshotSegment() {
+ return
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(),
currentSegmentEntryId,
+ lastSegmentEntryId);
+ }
+
+ CompletableFuture<Void> asyncDeleteBucketSnapshot() {
+ String bucketKey = bucketKey();
+ long bucketId = getAndUpdateBucketId();
+ return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
+
bucketSnapshotStorage.deleteBucketSnapshot(bucketId)).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.warn("Failed to delete bucket snapshot, bucketId:
{}, bucketKey: {}",
+ bucketId, bucketKey, ex);
+ }
+ });
+ }
+
void clear(boolean delete) {
delayedIndexBitMap.clear();
getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
if (delete) {
snapshotGenerateFuture.cancel(true);
- // TODO delete bucket snapshot
+ try {
+
asyncDeleteBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(e);
+ }
} else {
try {
snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 36026298269..ad457329c42 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -51,14 +51,19 @@ class MutableBucket extends Bucket implements AutoCloseable
{
Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
- if (priorityQueue.isEmpty()) {
+ return
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
sharedQueue,
+ TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue),
startLedgerId, endLedgerId);
+ }
+
+ Pair<ImmutableBucket, DelayedIndex>
createImmutableBucketAndAsyncPersistent(
+ final long timeStepPerBucketSnapshotSegment,
+ TripleLongPriorityQueue sharedQueue, DelayedIndexQueue
delayedIndexQueue, final long startLedgerId,
+ final long endLedgerId) {
+ if (delayedIndexQueue.isEmpty()) {
return null;
}
long numMessages = 0;
- final long startLedgerId = getStartLedgerId();
- final long endLedgerId = getEndLedgerId();
-
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
@@ -66,14 +71,15 @@ class MutableBucket extends Bucket implements AutoCloseable
{
SnapshotSegmentMetadata.Builder segmentMetadataBuilder =
SnapshotSegmentMetadata.newBuilder();
long currentTimestampUpperLimit = 0;
- while (!priorityQueue.isEmpty()) {
- long timestamp = priorityQueue.peekN1();
+ while (!delayedIndexQueue.isEmpty()) {
+ DelayedIndex delayedIndex = delayedIndexQueue.peek();
+ long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentTimestampUpperLimit = timestamp +
timeStepPerBucketSnapshotSegment - 1;
}
- long ledgerId = priorityQueue.peekN2();
- long entryId = priorityQueue.peekN3();
+ long ledgerId = delayedIndex.getLedgerId();
+ long entryId = delayedIndex.getEntryId();
checkArgument(ledgerId >= startLedgerId && ledgerId <=
endLedgerId);
@@ -82,19 +88,14 @@ class MutableBucket extends Bucket implements AutoCloseable
{
sharedQueue.add(timestamp, ledgerId, entryId);
}
- priorityQueue.pop();
+ delayedIndexQueue.pop();
numMessages++;
- DelayedIndex delayedIndex = DelayedIndex.newBuilder()
- .setTimestamp(timestamp)
- .setLedgerId(ledgerId)
- .setEntryId(entryId).build();
-
bitMap.computeIfAbsent(ledgerId, k -> new
RoaringBitmap()).add(entryId, entryId + 1);
snapshotSegmentBuilder.addIndexes(delayedIndex);
- if (priorityQueue.isEmpty() || priorityQueue.peekN1() >
currentTimestampUpperLimit) {
+ if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
currentTimestampUpperLimit = 0;
@@ -136,9 +137,7 @@ class MutableBucket extends Bucket implements AutoCloseable
{
bucketSnapshotMetadata, bucketSnapshotSegments);
bucket.setSnapshotCreateFuture(future);
future.whenComplete((__, ex) -> {
- if (ex == null) {
- bucket.setSnapshotCreateFuture(null);
- } else {
+ if (ex != null) {
//TODO Record create snapshot failed
log.error("Failed to create snapshot: ", ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
new file mode 100644
index 00000000000..b8d54bd78b4
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+
+@NotThreadSafe
+class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue {
+
+ private final TripleLongPriorityQueue queue;
+
+ private TripleLongPriorityDelayedIndexQueue(TripleLongPriorityQueue queue)
{
+ this.queue = queue;
+ }
+
+ public static TripleLongPriorityDelayedIndexQueue
wrap(TripleLongPriorityQueue queue) {
+ return new TripleLongPriorityDelayedIndexQueue(queue);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
+
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
+ return delayedIndex;
+ }
+
+ @Override
+ public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
+ queue.pop();
+ return peek;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
similarity index 90%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index abcde7902d8..0a2a76ec339 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.delayed;
+package org.apache.pulsar.broker.delayed.bucket;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -42,8 +42,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
+import org.apache.pulsar.broker.delayed.MockManagedCursor;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -133,6 +135,10 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
new BucketDelayedDeliveryTracker(dispatcher,
timer, 500, clock,
true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
}};
+ case "testMergeSnapshot" -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 10)
+ }};
default -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
true, bucketSnapshotStorage, 1000,
TimeUnit.MILLISECONDS.toMillis(100), 50)
@@ -235,4 +241,17 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
assertTrue(Arrays.equals(array, array2));
assertNotSame(array, array2);
}
+
+ @Test(dataProvider = "delayedTracker")
+ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
+ for (int i = 1; i <= 110; i++) {
+ tracker.addMessage(i, i, i * 10);
+ }
+
+ assertEquals(110, tracker.getNumberOfDelayedMessages());
+
+ int size = tracker.getImmutableBuckets().asMapOfRanges().size();
+
+ assertEquals(10, size);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
new file mode 100644
index 00000000000..865ccb6934a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static
org.apache.pulsar.broker.delayed.bucket.DelayedIndexQueue.COMPARATOR;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DelayedIndexQueueTest {
+
+ @Test
+ public void testCompare() {
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
+ DelayedIndex delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
+ .build();
+ Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+
+ delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
+ delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
+ .build();
+ Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+
+ delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
+ delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
+ .build();
+ Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+ }
+
+ @Test
+ public void testCombinedSegmentDelayedIndexQueue() {
+ List<DelayedIndex> listA = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+ .build();
+ listA.add(delayedIndex);
+ }
+ SnapshotSegment snapshotSegmentA1 =
SnapshotSegment.newBuilder().addAllIndexes(listA).build();
+
+ List<DelayedIndex> listA2 = new ArrayList<>();
+ for (int i = 10; i < 20; i++) {
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+ .build();
+ listA2.add(delayedIndex);
+ }
+ SnapshotSegment snapshotSegmentA2 =
SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
+
+ List<SnapshotSegment> segmentListA = new ArrayList<>();
+ segmentListA.add(snapshotSegmentA1);
+ segmentListA.add(snapshotSegmentA2);
+
+ List<DelayedIndex> listB = new ArrayList<>();
+ for (int i = 0; i < 9; i++) {
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+ .build();
+
+ DelayedIndex delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+ .build();
+ listB.add(delayedIndex);
+ listB.add(delayedIndex2);
+ }
+
+ SnapshotSegment snapshotSegmentB =
SnapshotSegment.newBuilder().addAllIndexes(listB).build();
+ List<SnapshotSegment> segmentListB = new ArrayList<>();
+ segmentListB.add(snapshotSegmentB);
+ segmentListB.add(SnapshotSegment.newBuilder().build());
+
+ CombinedSegmentDelayedIndexQueue delayedIndexQueue =
+ CombinedSegmentDelayedIndexQueue.wrap(segmentListA,
segmentListB);
+
+ int count = 0;
+ while (!delayedIndexQueue.isEmpty()) {
+ DelayedIndex pop = delayedIndexQueue.pop();
+ log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(),
pop.getEntryId());
+ count++;
+ if (!delayedIndexQueue.isEmpty()) {
+ DelayedIndex peek = delayedIndexQueue.peek();
+ Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
+ }
+ }
+ Assert.assertEquals(38, count);
+ }
+
+ @Test
+ public void TripleLongPriorityDelayedIndexQueueTest() {
+
+ @Cleanup
+ TripleLongPriorityQueue queue = new TripleLongPriorityQueue();
+ for (int i = 0; i < 10; i++) {
+ queue.add(i, 1, 1);
+ }
+
+ TripleLongPriorityDelayedIndexQueue delayedIndexQueue =
TripleLongPriorityDelayedIndexQueue.wrap(queue);
+
+ int count = 0;
+ while (!delayedIndexQueue.isEmpty()) {
+ DelayedIndex pop = delayedIndexQueue.pop();
+ count++;
+ if (!delayedIndexQueue.isEmpty()) {
+ DelayedIndex peek = delayedIndexQueue.peek();
+ Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
+ }
+ }
+
+ Assert.assertEquals(10, count);
+ }
+}