This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dc490fbeaf [feat][broker][PIP-195] Implement delayed message index 
bucket snapshot(merge/delete) - part8 (#19138)
9dc490fbeaf is described below

commit 9dc490fbeaf7aa27265bd0625cb9fab026bbd604
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Jan 17 10:18:53 2023 +0800

    [feat][broker][PIP-195] Implement delayed message index bucket 
snapshot(merge/delete) - part8 (#19138)
---
 .../pulsar/broker/delayed/bucket/Bucket.java       |   5 +
 .../bucket/BucketDelayedDeliveryTracker.java       | 116 +++++++++++++++--
 .../bucket/CombinedSegmentDelayedIndexQueue.java   | 109 ++++++++++++++++
 .../broker/delayed/bucket/DelayedIndexQueue.java   |  41 ++++++
 .../broker/delayed/bucket/ImmutableBucket.java     |  29 ++++-
 .../broker/delayed/bucket/MutableBucket.java       |  35 +++--
 .../TripleLongPriorityDelayedIndexQueue.java       |  57 ++++++++
 .../BucketDelayedDeliveryTrackerTest.java          |  25 +++-
 .../delayed/bucket/DelayedIndexQueueTest.java      | 143 +++++++++++++++++++++
 9 files changed, 526 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index c094d1dee7b..5d2a556337a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -143,4 +143,9 @@ abstract class Bucket {
         return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, 
String.valueOf(bucketId)),
                 ManagedLedgerException.BadVersionException.class, 
MaxRetryTimes);
     }
+
+    protected CompletableFuture<Void> removeBucketCursorProperty(String 
bucketKey) {
+        return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
+                ManagedLedgerException.BadVersionException.class, 
MaxRetryTimes);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b402b51ce07..715123487d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.delayed.bucket;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
@@ -32,14 +33,17 @@ import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.concurrent.ThreadSafe;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -71,6 +75,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final TripleLongPriorityQueue sharedBucketPriorityQueue;
 
+    @Getter
+    @VisibleForTesting
     private final RangeMap<Long, ImmutableBucket> immutableBuckets;
 
     private final Table<Long, Long, ImmutableBucket> 
snapshotSegmentLastIndexTable;
@@ -105,6 +111,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private synchronized long recoverBucketSnapshot() throws RuntimeException {
         ManagedCursor cursor = this.lastMutableBucket.cursor;
+        Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new 
ConcurrentHashMap<>();
         cursor.getCursorProperties().keySet().forEach(key -> {
             if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
                 String[] keys = key.split(DELIMITER);
@@ -112,8 +119,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 ImmutableBucket immutableBucket =
                         new ImmutableBucket(cursor, 
this.lastMutableBucket.bucketSnapshotStorage,
                                 Long.parseLong(keys[1]), 
Long.parseLong(keys[2]));
-                
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
-                        immutableBucket);
+                
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
+                        immutableBucket, toBeDeletedBucketMap);
             }
         });
 
@@ -122,10 +129,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         }
 
         List<CompletableFuture<Void>> futures = new 
ArrayList<>(immutableBuckets.asMapOfRanges().size());
-        for (ImmutableBucket immutableBucket : 
immutableBuckets.asMapOfRanges().values()) {
+        for (Map.Entry<Range<Long>, ImmutableBucket> entry 
:immutableBuckets.asMapOfRanges().entrySet()) {
+            Range<Long> key = entry.getKey();
+            ImmutableBucket immutableBucket = entry.getValue();
             CompletableFuture<Void> future =
                     
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime).thenAccept(indexList
 -> {
                         if (CollectionUtils.isEmpty(indexList)) {
+                            // Delete bucket snapshot if indexList is empty
+                            toBeDeletedBucketMap.put(key, immutableBucket);
                             return;
                         }
                         DelayedIndex lastDelayedIndex = 
indexList.get(indexList.size() - 1);
@@ -144,7 +155,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         }
 
         try {
-            FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+            FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
+                toBeDeletedBucketMap.forEach((k, immutableBucket) -> {
+                    immutableBuckets.asMapOfRanges().remove(k);
+                    immutableBucket.asyncDeleteBucketSnapshot();
+                });
+            }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             throw new RuntimeException(e);
         }
@@ -160,6 +176,26 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return numberDelayedMessages.getValue();
     }
 
+    private synchronized void putAndCleanOverlapRange(Range<Long> range, 
ImmutableBucket immutableBucket,
+                                                      Map<Range<Long>, 
ImmutableBucket> toBeDeletedBucketMap) {
+        RangeMap<Long, ImmutableBucket> subRangeMap = 
immutableBuckets.subRangeMap(range);
+        boolean canPut = false;
+        if (!subRangeMap.asMapOfRanges().isEmpty()) {
+            for (Map.Entry<Range<Long>, ImmutableBucket> rangeEntry : 
subRangeMap.asMapOfRanges().entrySet()) {
+                if (range.encloses(rangeEntry.getKey())) {
+                    toBeDeletedBucketMap.put(rangeEntry.getKey(), 
rangeEntry.getValue());
+                    canPut = true;
+                }
+            }
+        } else {
+            canPut = true;
+        }
+
+        if (canPut) {
+            immutableBuckets.put(range, immutableBucket);
+        }
+    }
+
     @Override
     public void run(Timeout timeout) throws Exception {
         synchronized (this) {
@@ -179,10 +215,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return Optional.ofNullable(immutableBuckets.get(ledgerId));
     }
 
-    private void sealBucket() {
-        Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
-                
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
-                        this.sharedBucketPriorityQueue);
+    private void afterCreateImmutableBucket(Pair<ImmutableBucket, 
DelayedIndex> immutableBucketDelayedIndexPair) {
         if (immutableBucketDelayedIndexPair != null) {
             ImmutableBucket immutableBucket = 
immutableBucketDelayedIndexPair.getLeft();
             immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
@@ -214,11 +247,21 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
                 && lastMutableBucket.size() >= minIndexCountPerBucket
                 && !lastMutableBucket.isEmpty()) {
-            sealBucket();
+            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                    
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+                            this.sharedBucketPriorityQueue);
+            afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
             lastMutableBucket.resetLastMutableBucketRange();
 
             if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
-                // TODO merge bucket snapshot (synchronize operate)
+                try {
+                    
asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+                } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                    if (e instanceof InterruptedException) {
+                        Thread.currentThread().interrupt();
+                    }
+                    throw new RuntimeException(e);
+                }
             }
         }
 
@@ -243,6 +286,55 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return true;
     }
 
+    private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
+        List<ImmutableBucket> values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+        long minNumberMessages = Long.MAX_VALUE;
+        int minIndex = -1;
+        for (int i = 0; i + 1 < values.size(); i++) {
+            ImmutableBucket bucketL = values.get(i);
+            ImmutableBucket bucketR = values.get(i + 1);
+            long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+            if (numberMessages < minNumberMessages) {
+                minNumberMessages = (int) numberMessages;
+                minIndex = i;
+            }
+        }
+        return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+    }
+
+    private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+                                                                          
ImmutableBucket bucketB) {
+        immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+        immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+        CompletableFuture<Long> snapshotCreateFutureA =
+                
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+        CompletableFuture<Long> snapshotCreateFutureB =
+                
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+        return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+            
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureA =
+                    bucketA.getRemainSnapshotSegment();
+            
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 futureB =
+                    bucketB.getRemainSnapshotSegment();
+            return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+                    .thenAccept(combinedDelayedIndexQueue -> {
+                        Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                                
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
+                                        timeStepPerBucketSnapshotSegment, 
sharedBucketPriorityQueue,
+                                        combinedDelayedIndexQueue, 
bucketA.startLedgerId, bucketB.endLedgerId);
+                        
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+
+                        
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+                                
.orElse(CompletableFuture.completedFuture(null)).thenCompose(___ -> {
+                                    CompletableFuture<Void> removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+                                    CompletableFuture<Void> removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+                                    return 
CompletableFuture.allOf(removeAFuture, removeBFuture);
+                                });
+                    });
+        });
+    }
+
     @Override
     public synchronized boolean hasMessageAvailable() {
         long cutoffTime = getCutoffTime();
@@ -299,7 +391,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             removeIndexBit(ledgerId, entryId);
 
             ImmutableBucket bucket = 
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
-            if (bucket != null) {
+            if (bucket != null && 
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Load next snapshot segment, bucket: {}", 
dispatcher.getName(), bucket);
                 }
@@ -308,6 +400,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 try {
                     
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
                         if (CollectionUtils.isEmpty(indexList)) {
+                            
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+                            bucket.asyncDeleteBucketSnapshot();
                             return;
                         }
                         DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
new file mode 100644
index 00000000000..3f89cc9fdfb
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
+
+    private final List<SnapshotSegment> segmentListA;
+    private final List<SnapshotSegment> segmentListB;
+
+    private int segmentListACursor = 0;
+    private int segmentListBCursor = 0;
+    private int segmentACursor = 0;
+    private int segmentBCursor = 0;
+
+    private CombinedSegmentDelayedIndexQueue(List<SnapshotSegment> 
segmentListA,
+                                             List<SnapshotSegment> 
segmentListB) {
+        this.segmentListA = segmentListA;
+        this.segmentListB = segmentListB;
+    }
+
+    public static CombinedSegmentDelayedIndexQueue wrap(
+            List<SnapshotSegment> segmentListA,
+            List<SnapshotSegment> segmentListB) {
+        return new CombinedSegmentDelayedIndexQueue(segmentListA, 
segmentListB);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return segmentListACursor >= segmentListA.size() && segmentListBCursor 
>= segmentListB.size();
+    }
+
+    @Override
+    public DelayedIndex peek() {
+        return getValue(false);
+    }
+
+    @Override
+    public DelayedIndex pop() {
+        return getValue(true);
+    }
+
+    private DelayedIndex getValue(boolean needAdvanceCursor) {
+        // skip empty segment
+        while (segmentListACursor < segmentListA.size()
+                && segmentListA.get(segmentListACursor).getIndexesCount() == 
0) {
+            segmentListACursor++;
+        }
+        while (segmentListBCursor < segmentListB.size()
+                && segmentListB.get(segmentListBCursor).getIndexesCount() == 
0) {
+            segmentListBCursor++;
+        }
+
+        DelayedIndex delayedIndexA = null;
+        DelayedIndex delayedIndexB = null;
+        if (segmentListACursor >= segmentListA.size()) {
+            delayedIndexB = 
segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
+        } else if (segmentListBCursor >= segmentListB.size()) {
+            delayedIndexA = 
segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
+        } else {
+            delayedIndexA = 
segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
+            delayedIndexB = 
segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
+        }
+
+        DelayedIndex resultValue;
+        if (delayedIndexB == null || (delayedIndexA != null && 
COMPARATOR.compare(delayedIndexA, delayedIndexB) < 0)) {
+            resultValue = delayedIndexA;
+            if (needAdvanceCursor) {
+                if (++segmentACursor >= 
segmentListA.get(segmentListACursor).getIndexesCount()) {
+                    segmentListA.set(segmentListACursor, null);
+                    ++segmentListACursor;
+                    segmentACursor = 0;
+                }
+            }
+        } else {
+            resultValue = delayedIndexB;
+            if (needAdvanceCursor) {
+                if (++segmentBCursor >= 
segmentListB.get(segmentListBCursor).getIndexesCount()) {
+                    segmentListB.set(segmentListBCursor, null);
+                    ++segmentListBCursor;
+                    segmentBCursor = 0;
+                }
+            }
+        }
+
+        return resultValue;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
new file mode 100644
index 00000000000..dee476c376e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.Comparator;
+import java.util.Objects;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+
+interface DelayedIndexQueue {
+    Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> 
COMPARATOR = (o1, o2) ->  {
+        if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
+            return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+        } else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
+            return Long.compare(o1.getLedgerId(), o2.getLedgerId());
+        } else {
+            return Long.compare(o1.getEntryId(), o2.getEntryId());
+        }
+    };
+
+    boolean isEmpty();
+
+    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
+
+    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 913893b1753..8348b4999ed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -24,7 +24,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -90,7 +92,6 @@ class ImmutableBucket extends Bucket {
 
             return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
                 if (nextSegmentEntryId > lastSegmentEntryId) {
-                    // TODO Delete bucket snapshot
                     return CompletableFuture.completedFuture(null);
                 }
 
@@ -132,12 +133,36 @@ class ImmutableBucket extends Bucket {
         this.setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
+    
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 getRemainSnapshotSegment() {
+        return 
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), 
currentSegmentEntryId,
+                lastSegmentEntryId);
+    }
+
+    CompletableFuture<Void> asyncDeleteBucketSnapshot() {
+        String bucketKey = bucketKey();
+        long bucketId = getAndUpdateBucketId();
+        return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
+                
bucketSnapshotStorage.deleteBucketSnapshot(bucketId)).whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        log.warn("Failed to delete bucket snapshot, bucketId: 
{}, bucketKey: {}",
+                                bucketId, bucketKey, ex);
+                    }
+        });
+    }
+
     void clear(boolean delete) {
         delayedIndexBitMap.clear();
         getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
             if (delete) {
                 snapshotGenerateFuture.cancel(true);
-                // TODO delete bucket snapshot
+                try {
+                    
asyncDeleteBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+                } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                    if (e instanceof InterruptedException) {
+                        Thread.currentThread().interrupt();
+                    }
+                    throw new RuntimeException(e);
+                }
             } else {
                 try {
                     snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 36026298269..ad457329c42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -51,14 +51,19 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
     Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
             long timeStepPerBucketSnapshotSegment,
             TripleLongPriorityQueue sharedQueue) {
-        if (priorityQueue.isEmpty()) {
+        return 
createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, 
sharedQueue,
+                TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), 
startLedgerId, endLedgerId);
+    }
+
+    Pair<ImmutableBucket, DelayedIndex> 
createImmutableBucketAndAsyncPersistent(
+            final long timeStepPerBucketSnapshotSegment,
+            TripleLongPriorityQueue sharedQueue, DelayedIndexQueue 
delayedIndexQueue, final long startLedgerId,
+            final long endLedgerId) {
+        if (delayedIndexQueue.isEmpty()) {
             return null;
         }
         long numMessages = 0;
 
-        final long startLedgerId = getStartLedgerId();
-        final long endLedgerId = getEndLedgerId();
-
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
         Map<Long, RoaringBitmap> bitMap = new HashMap<>();
@@ -66,14 +71,15 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
         SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
 
         long currentTimestampUpperLimit = 0;
-        while (!priorityQueue.isEmpty()) {
-            long timestamp = priorityQueue.peekN1();
+        while (!delayedIndexQueue.isEmpty()) {
+            DelayedIndex delayedIndex = delayedIndexQueue.peek();
+            long timestamp = delayedIndex.getTimestamp();
             if (currentTimestampUpperLimit == 0) {
                 currentTimestampUpperLimit = timestamp + 
timeStepPerBucketSnapshotSegment - 1;
             }
 
-            long ledgerId = priorityQueue.peekN2();
-            long entryId = priorityQueue.peekN3();
+            long ledgerId = delayedIndex.getLedgerId();
+            long entryId = delayedIndex.getEntryId();
 
             checkArgument(ledgerId >= startLedgerId && ledgerId <= 
endLedgerId);
 
@@ -82,19 +88,14 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                 sharedQueue.add(timestamp, ledgerId, entryId);
             }
 
-            priorityQueue.pop();
+            delayedIndexQueue.pop();
             numMessages++;
 
-            DelayedIndex delayedIndex = DelayedIndex.newBuilder()
-                    .setTimestamp(timestamp)
-                    .setLedgerId(ledgerId)
-                    .setEntryId(entryId).build();
-
             bitMap.computeIfAbsent(ledgerId, k -> new 
RoaringBitmap()).add(entryId, entryId + 1);
 
             snapshotSegmentBuilder.addIndexes(delayedIndex);
 
-            if (priorityQueue.isEmpty() || priorityQueue.peekN1() > 
currentTimestampUpperLimit) {
+            if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
                 segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
                 currentTimestampUpperLimit = 0;
 
@@ -136,9 +137,7 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                 bucketSnapshotMetadata, bucketSnapshotSegments);
         bucket.setSnapshotCreateFuture(future);
         future.whenComplete((__, ex) -> {
-            if (ex == null) {
-                bucket.setSnapshotCreateFuture(null);
-            } else {
+            if (ex != null) {
                 //TODO Record create snapshot failed
                 log.error("Failed to create snapshot: ", ex);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
new file mode 100644
index 00000000000..b8d54bd78b4
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+
+@NotThreadSafe
+class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue {
+
+    private final TripleLongPriorityQueue queue;
+
+    private TripleLongPriorityDelayedIndexQueue(TripleLongPriorityQueue queue) 
{
+        this.queue = queue;
+    }
+
+    public static TripleLongPriorityDelayedIndexQueue 
wrap(TripleLongPriorityQueue queue) {
+        return new TripleLongPriorityDelayedIndexQueue(queue);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return queue.isEmpty();
+    }
+
+    @Override
+    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
+                        
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
+        return delayedIndex;
+    }
+
+    @Override
+    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
+        queue.pop();
+        return peek;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
similarity index 90%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index abcde7902d8..0a2a76ec339 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.delayed;
+package org.apache.pulsar.broker.delayed.bucket;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -42,8 +42,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import org.apache.pulsar.broker.delayed.AbstractDeliveryTrackerTest;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.MockBucketSnapshotStorage;
+import org.apache.pulsar.broker.delayed.MockManagedCursor;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -133,6 +135,10 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
                             new BucketDelayedDeliveryTracker(dispatcher, 
timer, 500, clock,
                                     true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 50)
                     }};
+            case "testMergeSnapshot" -> new Object[][]{{
+                    new BucketDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
+                            true, bucketSnapshotStorage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), 10)
+            }};
             default -> new Object[][]{{
                     new BucketDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
                             true, bucketSnapshotStorage, 1000, 
TimeUnit.MILLISECONDS.toMillis(100), 50)
@@ -235,4 +241,17 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         assertTrue(Arrays.equals(array, array2));
         assertNotSame(array, array2);
     }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
+        for (int i = 1; i <= 110; i++) {
+            tracker.addMessage(i, i, i * 10);
+        }
+
+        assertEquals(110, tracker.getNumberOfDelayedMessages());
+
+        int size = tracker.getImmutableBuckets().asMapOfRanges().size();
+
+        assertEquals(10, size);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
new file mode 100644
index 00000000000..865ccb6934a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static 
org.apache.pulsar.broker.delayed.bucket.DelayedIndexQueue.COMPARATOR;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DelayedIndexQueueTest {
+
+    @Test
+    public void testCompare() {
+        DelayedIndex delayedIndex =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
+        DelayedIndex delayedIndex2 =
+                
DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
+                        .build();
+        Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+
+        delayedIndex =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
+        delayedIndex2 =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
+                        .build();
+        Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+
+        delayedIndex =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
+        delayedIndex2 =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
+                        .build();
+        Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
+    }
+
+    @Test
+    public void testCombinedSegmentDelayedIndexQueue() {
+        List<DelayedIndex> listA = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+                            .build();
+            listA.add(delayedIndex);
+        }
+        SnapshotSegment snapshotSegmentA1 = 
SnapshotSegment.newBuilder().addAllIndexes(listA).build();
+
+        List<DelayedIndex> listA2 = new ArrayList<>();
+        for (int i = 10; i < 20; i++) {
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+                            .build();
+            listA2.add(delayedIndex);
+        }
+        SnapshotSegment snapshotSegmentA2 = 
SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
+
+        List<SnapshotSegment> segmentListA = new ArrayList<>();
+        segmentListA.add(snapshotSegmentA1);
+        segmentListA.add(snapshotSegmentA2);
+
+        List<DelayedIndex> listB = new ArrayList<>();
+        for (int i = 0; i < 9; i++) {
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+                            .build();
+
+            DelayedIndex delayedIndex2 =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+                            .build();
+            listB.add(delayedIndex);
+            listB.add(delayedIndex2);
+        }
+
+        SnapshotSegment snapshotSegmentB = 
SnapshotSegment.newBuilder().addAllIndexes(listB).build();
+        List<SnapshotSegment> segmentListB = new ArrayList<>();
+        segmentListB.add(snapshotSegmentB);
+        segmentListB.add(SnapshotSegment.newBuilder().build());
+
+        CombinedSegmentDelayedIndexQueue delayedIndexQueue =
+                CombinedSegmentDelayedIndexQueue.wrap(segmentListA, 
segmentListB);
+
+        int count = 0;
+        while (!delayedIndexQueue.isEmpty()) {
+            DelayedIndex pop = delayedIndexQueue.pop();
+            log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(), 
pop.getEntryId());
+            count++;
+            if (!delayedIndexQueue.isEmpty()) {
+                DelayedIndex peek = delayedIndexQueue.peek();
+                Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
+            }
+        }
+        Assert.assertEquals(38, count);
+    }
+
+    @Test
+    public void TripleLongPriorityDelayedIndexQueueTest() {
+
+        @Cleanup
+        TripleLongPriorityQueue queue = new TripleLongPriorityQueue();
+        for (int i = 0; i < 10; i++) {
+            queue.add(i, 1, 1);
+        }
+
+        TripleLongPriorityDelayedIndexQueue delayedIndexQueue = 
TripleLongPriorityDelayedIndexQueue.wrap(queue);
+
+        int count = 0;
+        while (!delayedIndexQueue.isEmpty()) {
+            DelayedIndex pop = delayedIndexQueue.pop();
+            count++;
+            if (!delayedIndexQueue.isEmpty()) {
+                DelayedIndex peek = delayedIndexQueue.peek();
+                Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
+            }
+        }
+
+        Assert.assertEquals(10, count);
+    }
+}


Reply via email to