This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4aec6661e7 [fix][broker] Ensure previous delayed index be removed 
from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)
c4aec6661e7 is described below

commit c4aec6661e795c46181dc1fa79282065fa875768
Author: Cong Zhao <[email protected]>
AuthorDate: Sun Apr 16 16:05:44 2023 +0800

    [fix][broker] Ensure previous delayed index be removed from 
snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)
---
 .../bucket/BucketDelayedDeliveryTracker.java       | 106 ++++++++++++---------
 .../PersistentDispatcherMultipleConsumers.java     |  13 +--
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  30 ++++--
 3 files changed, 85 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 6ead1e207b0..6678c6df254 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.delayed.bucket;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
-import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Range;
@@ -84,7 +83,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final int maxNumBuckets;
 
-    private long numberDelayedMessages;
+    private volatile long numberDelayedMessages;
 
     @Getter
     @VisibleForTesting
@@ -102,6 +101,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final BucketDelayedMessageIndexStats stats;
 
+    private CompletableFuture<Void> pendingLoad = null;
+
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
                                  Timer timer, long tickTimeMillis,
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -269,7 +270,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     if (ex == null) {
                         immutableBucket.setSnapshotSegments(null);
                         immutableBucket.asyncUpdateSnapshotLength();
-                        log.info("[{}] Creat bucket snapshot finish, 
bucketKey: {}", dispatcher.getName(),
+                        log.info("[{}] Create bucket snapshot finish, 
bucketKey: {}", dispatcher.getName(),
                                 immutableBucket.bucketKey());
 
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
@@ -529,17 +530,25 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
     }
 
     @Override
-    public synchronized long getNumberOfDelayedMessages() {
+    public long getNumberOfDelayedMessages() {
         return numberDelayedMessages;
     }
 
     @Override
-    public synchronized long getBufferMemoryUsage() {
+    public long getBufferMemoryUsage() {
         return this.lastMutableBucket.getBufferMemoryUsage() + 
sharedBucketPriorityQueue.bytesCapacity();
     }
 
     @Override
     public synchronized NavigableSet<PositionImpl> getScheduledMessages(int 
maxMessages) {
+        if (!checkPendingOpDone()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skip getScheduledMessages to wait for bucket 
snapshot load finish.",
+                        dispatcher.getName());
+            }
+            return Collections.emptyNavigableSet();
+        }
+
         long cutoffTime = getCutoffTime();
 
         lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, 
sharedBucketPriorityQueue);
@@ -558,6 +567,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
             ImmutableBucket bucket = 
snapshotSegmentLastIndexTable.get(ledgerId, entryId);
             if (bucket != null && 
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+                // All message of current snapshot segment are scheduled, try 
load next snapshot segment
                 if (bucket.merging) {
                     log.info("[{}] Skip load to wait for bucket snapshot merge 
finish, bucketKey:{}",
                             dispatcher.getName(), bucket.bucketKey());
@@ -569,26 +579,19 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     log.debug("[{}] Loading next bucket snapshot segment, 
bucketKey: {}, nextSegmentEntryId: {}",
                             dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1);
                 }
-                // All message of current snapshot segment are scheduled, load 
next snapshot segment
-                // TODO make it asynchronous and not blocking this process
-                try {
-                    boolean createFutureDone = 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
-
-                    if (!createFutureDone) {
-                        log.info("[{}] Skip load to wait for bucket snapshot 
create finish, bucketKey:{}",
-                                dispatcher.getName(), bucket.bucketKey());
-                        break;
-                    }
-
-                    if (bucket.currentSegmentEntryId == 
bucket.lastSegmentEntryId) {
-                        
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, 
bucket.endLedgerId));
-                        bucket.asyncDeleteBucketSnapshot(stats);
-                        continue;
-                    }
+                boolean createFutureDone = 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
+                if (!createFutureDone) {
+                    log.info("[{}] Skip load to wait for bucket snapshot 
create finish, bucketKey:{}",
+                            dispatcher.getName(), bucket.bucketKey());
+                    break;
+                }
 
-                    long loadStartTime = System.currentTimeMillis();
-                    
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
-                    
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
+                long loadStartTime = System.currentTimeMillis();
+                
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
+                CompletableFuture<Void> loadFuture = pendingLoad = 
bucket.asyncLoadNextBucketSnapshotEntry()
+                        .thenAccept(indexList -> {
+                    synchronized (BucketDelayedDeliveryTracker.this) {
+                        this.snapshotSegmentLastIndexTable.remove(ledgerId, 
entryId);
                         if (CollectionUtils.isEmpty(indexList)) {
                             immutableBuckets.asMapOfRanges()
                                     .remove(Range.closed(bucket.startLedgerId, 
bucket.endLedgerId));
@@ -603,31 +606,36 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                             
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
                                     index.getEntryId());
                         }
-                    }).whenComplete((__, ex) -> {
-                        if (ex != null) {
-                            // Back bucket state
-                            bucket.setCurrentSegmentEntryId(preSegmentEntryId);
-
-                            log.error("[{}] Failed to load bucket snapshot 
segment, bucketKey: {}, segmentEntryId: {}",
-                                    dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1, ex);
-
-                            
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
-                        } else {
-                            log.info("[{}] Load next bucket snapshot segment 
finish, bucketKey: {}, segmentEntryId: {}",
-                                    dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1);
-
-                            
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
-                                    System.currentTimeMillis() - 
loadStartTime);
+                    }
+                }).whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        // Back bucket state
+                        bucket.setCurrentSegmentEntryId(preSegmentEntryId);
+
+                        log.error("[{}] Failed to load bucket snapshot 
segment, bucketKey: {}, segmentEntryId: {}",
+                                dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1, ex);
+
+                        
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
+                    } else {
+                        log.info("[{}] Load next bucket snapshot segment 
finish, bucketKey: {}, segmentEntryId: {}",
+                                dispatcher.getName(), bucket.bucketKey(),
+                                (preSegmentEntryId == 
bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);
+
+                        
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+                                System.currentTimeMillis() - loadStartTime);
+                    }
+                    synchronized (this) {
+                        if (timeout != null) {
+                            timeout.cancel();
                         }
-                    }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), 
TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    // Ignore exception to reload this segment on the next 
schedule.
-                    log.error("[{}] An exception occurs when load next bucket 
snapshot, bucketKey:{}",
-                            dispatcher.getName(), bucket.bucketKey(), e);
+                        timeout = timer.newTimeout(this, tickTimeMillis, 
TimeUnit.MILLISECONDS);
+                    }
+                });
+
+                if (!checkPendingOpDone() || 
loadFuture.isCompletedExceptionally()) {
                     break;
                 }
             }
-            snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
 
             positions.add(new PositionImpl(ledgerId, entryId));
 
@@ -643,6 +651,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return positions;
     }
 
+    private synchronized boolean checkPendingOpDone() {
+        if (pendingLoad == null || pendingLoad.isDone()) {
+            pendingLoad = null;
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public boolean shouldPauseAllDeliveries() {
         return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c60b4562bf1..81adda053e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,7 +46,6 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -1089,7 +1088,7 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
     }
 
     @Override
-    public synchronized long getNumberOfDelayedMessages() {
+    public long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
@@ -1169,15 +1168,7 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 
 
     public long getDelayedTrackerMemoryUsage() {
-        if (delayedDeliveryTracker.isEmpty()) {
-            return 0;
-        }
-
-        if (delayedDeliveryTracker.get() instanceof 
AbstractDelayedDeliveryTracker) {
-            return delayedDeliveryTracker.get().getBufferMemoryUsage();
-        }
-
-        return 0;
+        return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
     public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 95234d688f6..39b3992fbd1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -39,6 +39,7 @@ import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -197,9 +198,11 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         });
 
         assertTrue(tracker.hasMessageAvailable());
-        Set<PositionImpl> scheduledMessages = 
tracker.getScheduledMessages(100);
-
-        assertEquals(scheduledMessages.size(), 1);
+        Set<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker.getScheduledMessages(100));
+            assertEquals(scheduledMessages.size(), 1);
+        });
 
         tracker.addMessage(101, 101, 101 * 10);
 
@@ -216,12 +219,15 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
         clockTime.set(100 * 10);
 
         assertTrue(tracker2.hasMessageAvailable());
-        scheduledMessages = tracker2.getScheduledMessages(70);
+        Set<PositionImpl> scheduledMessages2 = new TreeSet<>();
 
-        assertEquals(scheduledMessages.size(), 70);
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages2.addAll(tracker2.getScheduledMessages(70));
+            assertEquals(scheduledMessages2.size(), 70);
+        });
 
         int i = 31;
-        for (PositionImpl scheduledMessage : scheduledMessages) {
+        for (PositionImpl scheduledMessage : scheduledMessages2) {
             assertEquals(scheduledMessage, PositionImpl.get(i, i));
             i++;
         }
@@ -298,7 +304,11 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         clockTime.set(110 * 10);
 
-        NavigableSet<PositionImpl> scheduledMessages = 
tracker2.getScheduledMessages(110);
+        NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker2.getScheduledMessages(110));
+            assertEquals(scheduledMessages.size(), 110);
+        });
         for (int i = 1; i <= 110; i++) {
             PositionImpl position = scheduledMessages.pollFirst();
             assertEquals(position, PositionImpl.get(i, i));
@@ -370,7 +380,11 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
 
         assertEquals(tracker2.getScheduledMessages(100).size(), 0);
 
-        assertEquals(tracker2.getScheduledMessages(100).size(), 
delayedMessagesInSnapshotValue);
+        Set<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker2.getScheduledMessages(100));
+            assertEquals(scheduledMessages.size(), 
delayedMessagesInSnapshotValue);
+        });
 
         assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
         
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());

Reply via email to