This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c4aec6661e7 [fix][broker] Ensure previous delayed index be removed
from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)
c4aec6661e7 is described below
commit c4aec6661e795c46181dc1fa79282065fa875768
Author: Cong Zhao <[email protected]>
AuthorDate: Sun Apr 16 16:05:44 2023 +0800
[fix][broker] Ensure previous delayed index be removed from
snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)
---
.../bucket/BucketDelayedDeliveryTracker.java | 106 ++++++++++++---------
.../PersistentDispatcherMultipleConsumers.java | 13 +--
.../bucket/BucketDelayedDeliveryTrackerTest.java | 30 ++++--
3 files changed, 85 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 6ead1e207b0..6678c6df254 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.delayed.bucket;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
-import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
@@ -84,7 +83,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final int maxNumBuckets;
- private long numberDelayedMessages;
+ private volatile long numberDelayedMessages;
@Getter
@VisibleForTesting
@@ -102,6 +101,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final BucketDelayedMessageIndexStats stats;
+ private CompletableFuture<Void> pendingLoad = null;
+
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -269,7 +270,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
- log.info("[{}] Creat bucket snapshot finish,
bucketKey: {}", dispatcher.getName(),
+ log.info("[{}] Create bucket snapshot finish,
bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
@@ -529,17 +530,25 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
@Override
- public synchronized long getNumberOfDelayedMessages() {
+ public long getNumberOfDelayedMessages() {
return numberDelayedMessages;
}
@Override
- public synchronized long getBufferMemoryUsage() {
+ public long getBufferMemoryUsage() {
return this.lastMutableBucket.getBufferMemoryUsage() +
sharedBucketPriorityQueue.bytesCapacity();
}
@Override
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int
maxMessages) {
+ if (!checkPendingOpDone()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skip getScheduledMessages to wait for bucket
snapshot load finish.",
+ dispatcher.getName());
+ }
+ return Collections.emptyNavigableSet();
+ }
+
long cutoffTime = getCutoffTime();
lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime,
sharedBucketPriorityQueue);
@@ -558,6 +567,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
ImmutableBucket bucket =
snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null &&
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+ // All message of current snapshot segment are scheduled, try
load next snapshot segment
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge
finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
@@ -569,26 +579,19 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
log.debug("[{}] Loading next bucket snapshot segment,
bucketKey: {}, nextSegmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1);
}
- // All message of current snapshot segment are scheduled, load
next snapshot segment
- // TODO make it asynchronous and not blocking this process
- try {
- boolean createFutureDone =
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
-
- if (!createFutureDone) {
- log.info("[{}] Skip load to wait for bucket snapshot
create finish, bucketKey:{}",
- dispatcher.getName(), bucket.bucketKey());
- break;
- }
-
- if (bucket.currentSegmentEntryId ==
bucket.lastSegmentEntryId) {
-
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId,
bucket.endLedgerId));
- bucket.asyncDeleteBucketSnapshot(stats);
- continue;
- }
+ boolean createFutureDone =
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
+ if (!createFutureDone) {
+ log.info("[{}] Skip load to wait for bucket snapshot
create finish, bucketKey:{}",
+ dispatcher.getName(), bucket.bucketKey());
+ break;
+ }
- long loadStartTime = System.currentTimeMillis();
-
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
-
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
+ long loadStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
+ CompletableFuture<Void> loadFuture = pendingLoad =
bucket.asyncLoadNextBucketSnapshotEntry()
+ .thenAccept(indexList -> {
+ synchronized (BucketDelayedDeliveryTracker.this) {
+ this.snapshotSegmentLastIndexTable.remove(ledgerId,
entryId);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId,
bucket.endLedgerId));
@@ -603,31 +606,36 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
- }).whenComplete((__, ex) -> {
- if (ex != null) {
- // Back bucket state
- bucket.setCurrentSegmentEntryId(preSegmentEntryId);
-
- log.error("[{}] Failed to load bucket snapshot
segment, bucketKey: {}, segmentEntryId: {}",
- dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1, ex);
-
-
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
- } else {
- log.info("[{}] Load next bucket snapshot segment
finish, bucketKey: {}, segmentEntryId: {}",
- dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1);
-
-
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
- System.currentTimeMillis() -
loadStartTime);
+ }
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ // Back bucket state
+ bucket.setCurrentSegmentEntryId(preSegmentEntryId);
+
+ log.error("[{}] Failed to load bucket snapshot
segment, bucketKey: {}, segmentEntryId: {}",
+ dispatcher.getName(), bucket.bucketKey(),
preSegmentEntryId + 1, ex);
+
+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
+ } else {
+ log.info("[{}] Load next bucket snapshot segment
finish, bucketKey: {}, segmentEntryId: {}",
+ dispatcher.getName(), bucket.bucketKey(),
+ (preSegmentEntryId ==
bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);
+
+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+ System.currentTimeMillis() - loadStartTime);
+ }
+ synchronized (this) {
+ if (timeout != null) {
+ timeout.cancel();
}
- }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1),
TimeUnit.SECONDS);
- } catch (Exception e) {
- // Ignore exception to reload this segment on the next
schedule.
- log.error("[{}] An exception occurs when load next bucket
snapshot, bucketKey:{}",
- dispatcher.getName(), bucket.bucketKey(), e);
+ timeout = timer.newTimeout(this, tickTimeMillis,
TimeUnit.MILLISECONDS);
+ }
+ });
+
+ if (!checkPendingOpDone() ||
loadFuture.isCompletedExceptionally()) {
break;
}
}
- snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
positions.add(new PositionImpl(ledgerId, entryId));
@@ -643,6 +651,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return positions;
}
+ private synchronized boolean checkPendingOpDone() {
+ if (pendingLoad == null || pendingLoad.isDone()) {
+ pendingLoad = null;
+ return true;
+ }
+ return false;
+ }
+
@Override
public boolean shouldPauseAllDeliveries() {
return false;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c60b4562bf1..81adda053e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,7 +46,6 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -1089,7 +1088,7 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
@Override
- public synchronized long getNumberOfDelayedMessages() {
+ public long getNumberOfDelayedMessages() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}
@@ -1169,15 +1168,7 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
public long getDelayedTrackerMemoryUsage() {
- if (delayedDeliveryTracker.isEmpty()) {
- return 0;
- }
-
- if (delayedDeliveryTracker.get() instanceof
AbstractDelayedDeliveryTracker) {
- return delayedDeliveryTracker.get().getBufferMemoryUsage();
- }
-
- return 0;
+ return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}
public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 95234d688f6..39b3992fbd1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -39,6 +39,7 @@ import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -197,9 +198,11 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
});
assertTrue(tracker.hasMessageAvailable());
- Set<PositionImpl> scheduledMessages =
tracker.getScheduledMessages(100);
-
- assertEquals(scheduledMessages.size(), 1);
+ Set<PositionImpl> scheduledMessages = new TreeSet<>();
+ Awaitility.await().untilAsserted(() -> {
+ scheduledMessages.addAll(tracker.getScheduledMessages(100));
+ assertEquals(scheduledMessages.size(), 1);
+ });
tracker.addMessage(101, 101, 101 * 10);
@@ -216,12 +219,15 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
clockTime.set(100 * 10);
assertTrue(tracker2.hasMessageAvailable());
- scheduledMessages = tracker2.getScheduledMessages(70);
+ Set<PositionImpl> scheduledMessages2 = new TreeSet<>();
- assertEquals(scheduledMessages.size(), 70);
+ Awaitility.await().untilAsserted(() -> {
+ scheduledMessages2.addAll(tracker2.getScheduledMessages(70));
+ assertEquals(scheduledMessages2.size(), 70);
+ });
int i = 31;
- for (PositionImpl scheduledMessage : scheduledMessages) {
+ for (PositionImpl scheduledMessage : scheduledMessages2) {
assertEquals(scheduledMessage, PositionImpl.get(i, i));
i++;
}
@@ -298,7 +304,11 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
clockTime.set(110 * 10);
- NavigableSet<PositionImpl> scheduledMessages =
tracker2.getScheduledMessages(110);
+ NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>();
+ Awaitility.await().untilAsserted(() -> {
+ scheduledMessages.addAll(tracker2.getScheduledMessages(110));
+ assertEquals(scheduledMessages.size(), 110);
+ });
for (int i = 1; i <= 110; i++) {
PositionImpl position = scheduledMessages.pollFirst();
assertEquals(position, PositionImpl.get(i, i));
@@ -370,7 +380,11 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
assertEquals(tracker2.getScheduledMessages(100).size(), 0);
- assertEquals(tracker2.getScheduledMessages(100).size(),
delayedMessagesInSnapshotValue);
+ Set<PositionImpl> scheduledMessages = new TreeSet<>();
+ Awaitility.await().untilAsserted(() -> {
+ scheduledMessages.addAll(tracker2.getScheduledMessages(100));
+ assertEquals(scheduledMessages.size(),
delayedMessagesInSnapshotValue);
+ });
assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());