This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6f4aaea84d [feat][broker][PIP-195] Implement delayed message index
bucket snapshot (create/load) - part2 (#17611)
e6f4aaea84d is described below
commit e6f4aaea84d027cea3016683815cc4e1a890cf88
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Nov 5 09:50:19 2022 +0800
[feat][broker][PIP-195] Implement delayed message index bucket snapshot
(create/load) - part2 (#17611)
---
.../apache/bookkeeper/mledger/util/Futures.java | 7 +-
.../mledger/impl/ManagedCursorPropertiesTest.java | 6 +-
.../delayed/AbstractDelayedDeliveryTracker.java | 162 ++++++++
.../delayed/InMemoryDelayedDeliveryTracker.java | 161 ++------
.../pulsar/broker/delayed/bucket/Bucket.java | 145 ++++++++
.../bucket/BucketDelayedDeliveryTracker.java | 324 ++++++++++++++++
.../{ => bucket}/BucketSnapshotStorage.java | 2 +-
.../broker/delayed/bucket/ImmutableBucket.java | 101 +++++
.../broker/delayed/bucket/MutableBucket.java | 203 ++++++++++
.../broker/delayed/bucket/package-info.java} | 27 +-
.../PersistentDispatcherMultipleConsumers.java | 4 +
.../DelayedMessageIndexBucketSnapshotFormat.proto | 4 +-
.../delayed/AbstractDeliveryTrackerTest.java | 240 ++++++++++++
.../delayed/BucketDelayedDeliveryTrackerTest.java | 159 ++++++++
.../delayed/InMemoryDeliveryTrackerTest.java | 412 +++++----------------
.../broker/delayed/MockBucketSnapshotStorage.java | 160 ++++++++
.../pulsar/broker/delayed/MockManagedCursor.java | 412 +++++++++++++++++++++
17 files changed, 2039 insertions(+), 490 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 637de8a79fc..dc1d1eb6c9a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -71,14 +71,15 @@ public class Futures {
}
public static <T> CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op,
- Class<? extends
Exception> needRetryExceptionClass) {
+ Class<? extends
Exception> needRetryExceptionClass,
+ int maxRetryTimes)
{
CompletableFuture<T> resultFuture = new CompletableFuture<>();
op.get().whenComplete((res, ex) -> {
if (ex == null) {
resultFuture.complete(res);
} else {
- if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
- executeWithRetry(op,
needRetryExceptionClass).whenComplete((res2, ex2) -> {
+ if (needRetryExceptionClass.isAssignableFrom(ex.getClass()) &&
maxRetryTimes > 0) {
+ executeWithRetry(op, needRetryExceptionClass,
maxRetryTimes - 1).whenComplete((res2, ex2) -> {
if (ex2 == null) {
resultFuture.complete(res2);
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index f0d40835656..2ef8f3eae32 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -247,13 +247,13 @@ public class ManagedCursorPropertiesTest extends
MockedBookKeeperTestCase {
map.put("c", "3");
futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
- ManagedLedgerException.BadVersionException.class));
+ ManagedLedgerException.BadVersionException.class, 3));
futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
- ManagedLedgerException.BadVersionException.class));
+ ManagedLedgerException.BadVersionException.class, 3));
futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
- ManagedLedgerException.BadVersionException.class));
+ ManagedLedgerException.BadVersionException.class, 3));
for (CompletableFuture<Void> future : futures) {
future.get();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
new file mode 100644
index 00000000000..5c99e4c307d
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+
+@Slf4j
+public abstract class AbstractDelayedDeliveryTracker implements
DelayedDeliveryTracker, TimerTask {
+
+ protected final PersistentDispatcherMultipleConsumers dispatcher;
+
+ // Reference to the shared (per-broker) timer for delayed delivery
+ protected final Timer timer;
+
+ // Current timeout or null if not set
+ protected Timeout timeout;
+
+ // Timestamp at which the timeout is currently set
+ private long currentTimeoutTarget;
+
+ // Last time the TimerTask was triggered for this class
+ private long lastTickRun;
+
+ protected long tickTimeMillis;
+
+ protected final Clock clock;
+
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+ public
AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
+ long tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict) {
+ this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict);
+ }
+
+ public
AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
+ long tickTimeMillis, Clock clock,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict) {
+ this.dispatcher = dispatcher;
+ this.timer = timer;
+ this.tickTimeMillis = tickTimeMillis;
+ this.clock = clock;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ }
+
+
+ /**
+ * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow
for early delivery by as much as the
+ * {@link #tickTimeMillis} because it is a slight optimization to let
messages skip going back into the delay
+ * tracker for a brief amount of time when we're already trying to
dispatch to the consumer.
+ *
+ * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the
current time to determine when messages
+ * can be delivered. As a consequence, there are two delays that will
affect delivery. The first is the
+ * {@link #tickTimeMillis} and the second is the {@link Timer}'s
granularity.
+ *
+ * @return the cutoff time to determine whether a message is ready to
deliver to the consumer
+ */
+ protected long getCutoffTime() {
+ return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() :
clock.millis() + tickTimeMillis;
+ }
+
+ public void resetTickTime(long tickTime) {
+ if (this.tickTimeMillis != tickTime) {
+ this.tickTimeMillis = tickTime;
+ }
+ }
+
+ protected void updateTimer() {
+ if (getNumberOfDelayedMessages() == 0) {
+ if (timeout != null) {
+ currentTimeoutTarget = -1;
+ timeout.cancel();
+ timeout = null;
+ }
+ return;
+ }
+ long timestamp = nextDeliveryTime();
+ if (timestamp == currentTimeoutTarget) {
+ // The timer is already set to the correct target time
+ return;
+ }
+
+ if (timeout != null) {
+ timeout.cancel();
+ }
+
+ long now = clock.millis();
+ long delayMillis = timestamp - now;
+
+ if (delayMillis < 0) {
+ // There are messages that are already ready to be delivered. If
+ // the dispatcher is not getting them is because the consumer is
+ // either not connected or slow.
+ // We don't need to keep retriggering the timer. When the consumer
+ // catches up, the dispatcher will do the readMoreEntries() and
+ // get these messages
+ return;
+ }
+
+ // Compute the earliest time that we schedule the timer to run.
+ long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+ long calculatedDelayMillis = Math.max(delayMillis,
remainingTickDelayMillis);
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Start timer in {} millis", dispatcher.getName(),
calculatedDelayMillis);
+ }
+
+ // Even though we may delay longer than this timestamp because of the
tick delay, we still track the
+ // current timeout with reference to the next message's timestamp.
+ currentTimeoutTarget = timestamp;
+ timeout = timer.newTimeout(this, calculatedDelayMillis,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Timer triggered", dispatcher.getName());
+ }
+ if (timeout == null || timeout.isCancelled()) {
+ return;
+ }
+
+ synchronized (dispatcher) {
+ lastTickRun = clock.millis();
+ currentTimeoutTarget = -1;
+ this.timeout = null;
+ dispatcher.readMoreEntries();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (timeout != null) {
+ timeout.cancel();
+ timeout = null;
+ }
+ }
+
+ protected abstract long nextDeliveryTime();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index da28ff19234..f55d5fd1169 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -18,47 +18,28 @@
*/
package org.apache.pulsar.broker.delayed;
-import io.netty.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
-import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@Slf4j
-public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker,
TimerTask {
+public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker {
protected final TripleLongPriorityQueue priorityQueue = new
TripleLongPriorityQueue();
- private final PersistentDispatcherMultipleConsumers dispatcher;
-
- // Reference to the shared (per-broker) timer for delayed delivery
- private final Timer timer;
-
- // Current timeout or null if not set
- protected Timeout timeout;
-
- // Timestamp at which the timeout is currently set
- private long currentTimeoutTarget;
-
- // Last time the TimerTask was triggered for this class
- private long lastTickRun;
-
- private long tickTimeMillis;
-
- private final Clock clock;
-
- private final boolean isDelayedDeliveryDeliverAtTimeStrict;
-
// If we detect that all messages have fixed delay time, such that the
delivery is
// always going to be in FIFO order, then we can avoid pulling all the
messages in
// tracker. Instead, we use the lookahead for detection and pause the read
from
// the cursor if the delays are fixed.
+ @Getter
+ @VisibleForTesting
private final long fixedDelayDetectionLookahead;
// This is the timestamp of the message with the highest delivery time
@@ -76,33 +57,14 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
fixedDelayDetectionLookahead);
}
- InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
+ public
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean
isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
- this.dispatcher = dispatcher;
- this.timer = timer;
- this.tickTimeMillis = tickTimeMillis;
- this.clock = clock;
- this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}
- /**
- * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow
for early delivery by as much as the
- * {@link #tickTimeMillis} because it is a slight optimization to let
messages skip going back into the delay
- * tracker for a brief amount of time when we're already trying to
dispatch to the consumer.
- *
- * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the
current time to determine when messages
- * can be delivered. As a consequence, there are two delays that will
affect delivery. The first is the
- * {@link #tickTimeMillis} and the second is the {@link Timer}'s
granularity.
- *
- * @return the cutoff time to determine whether a message is ready to
deliver to the consumer
- */
- private long getCutoffTime() {
- return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() :
clock.millis() + tickTimeMillis;
- }
-
@Override
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
@@ -115,19 +77,24 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
deliverAt - clock.millis());
}
-
priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();
- // Check that new delivery time comes after the current highest, or at
- // least within a single tick time interval of 1 second.
+ checkAndUpdateHighest(deliverAt);
+
+ return true;
+ }
+
+ /**
+ * Check that new delivery time comes after the current highest, or at
+ * least within a single tick time interval of 1 second.
+ */
+ private void checkAndUpdateHighest(long deliverAt) {
if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
messagesHaveFixedDelay = false;
}
highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked,
deliverAt);
-
- return true;
}
/**
@@ -179,14 +146,6 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
return positions;
}
- @Override
- public void resetTickTime(long tickTime) {
-
- if (this.tickTimeMillis != tickTime) {
- this.tickTimeMillis = tickTime;
- }
- }
-
@Override
public void clear() {
this.priorityQueue.clear();
@@ -202,87 +161,9 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
return priorityQueue.bytesCapacity();
}
- /**
- * Update the scheduled timer task such that:
- * 1. If there are no delayed messages, return and do not schedule a timer
task.
- * 2. If the next message in the queue has the same deliverAt time as the
timer task, return and leave existing
- * timer task in place.
- * 3. If the deliverAt time for the next delayed message has already
passed (i.e. the delay is negative), return
- * without scheduling a timer task since the subscription is backlogged.
- * 4. Else, schedule a timer task where the delay is the greater of these
two: the next message's deliverAt time or
- * the last tick time plus the tickTimeMillis (to ensure we do not
schedule the task more frequently than the
- * tickTimeMillis).
- */
- private void updateTimer() {
- if (priorityQueue.isEmpty()) {
- if (timeout != null) {
- currentTimeoutTarget = -1;
- timeout.cancel();
- timeout = null;
- }
- return;
- }
-
- long timestamp = priorityQueue.peekN1();
- if (timestamp == currentTimeoutTarget) {
- // The timer is already set to the correct target time
- return;
- }
-
- if (timeout != null) {
- timeout.cancel();
- }
-
- long now = clock.millis();
- long delayMillis = timestamp - now;
-
- if (delayMillis < 0) {
- // There are messages that are already ready to be delivered. If
- // the dispatcher is not getting them is because the consumer is
- // either not connected or slow.
- // We don't need to keep retriggering the timer. When the consumer
- // catches up, the dispatcher will do the readMoreEntries() and
- // get these messages
- return;
- }
-
- // Compute the earliest time that we schedule the timer to run.
- long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
- long calculatedDelayMillis = Math.max(delayMillis,
remainingTickDelayMillis);
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Start timer in {} millis", dispatcher.getName(),
calculatedDelayMillis);
- }
-
- // Even though we may delay longer than this timestamp because of the
tick delay, we still track the
- // current timeout with reference to the next message's timestamp.
- currentTimeoutTarget = timestamp;
- timeout = timer.newTimeout(this, calculatedDelayMillis,
TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Timer triggered", dispatcher.getName());
- }
- if (timeout == null || timeout.isCancelled()) {
- return;
- }
-
- synchronized (dispatcher) {
- lastTickRun = clock.millis();
- currentTimeoutTarget = -1;
- this.timeout = null;
- dispatcher.readMoreEntries();
- }
- }
-
@Override
public void close() {
- if (timeout != null) {
- timeout.cancel();
- timeout = null;
- }
+ super.close();
priorityQueue.close();
}
@@ -291,7 +172,7 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
// Pause deliveries if we know all delays are fixed within the
lookahead window
return fixedDelayDetectionLookahead > 0
&& messagesHaveFixedDelay
- && priorityQueue.size() >= fixedDelayDetectionLookahead
+ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}
@@ -299,4 +180,8 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
public boolean containsMessage(long ledgerId, long entryId) {
return false;
}
+
+ protected long nextDeliveryTime() {
+ return priorityQueue.peekN1();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
new file mode 100644
index 00000000000..fbd6d765705
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.roaringbitmap.RoaringBitmap;
+
+@Slf4j
+@Data
+@AllArgsConstructor
+abstract class Bucket {
+
+ static final String DELAYED_BUCKET_KEY_PREFIX =
"#pulsar.internal.delayed.bucket";
+ static final String DELIMITER = "_";
+ static final int MaxRetryTimes = 3;
+
+ protected final ManagedCursor cursor;
+ protected final BucketSnapshotStorage bucketSnapshotStorage;
+
+ long startLedgerId;
+ long endLedgerId;
+
+ Map<Long, RoaringBitmap> delayedIndexBitMap;
+
+ long numberBucketDelayedMessages;
+
+ int lastSegmentEntryId;
+
+ int currentSegmentEntryId;
+
+ long snapshotLength;
+
+ private volatile Long bucketId;
+
+ private volatile CompletableFuture<Long> snapshotCreateFuture;
+
+
+ Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long
startLedgerId, long endLedgerId) {
+ this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1,
-1, 0, 0, null, null);
+ }
+
+ boolean containsMessage(long ledgerId, long entryId) {
+ RoaringBitmap bitSet = delayedIndexBitMap.get(ledgerId);
+ if (bitSet == null) {
+ return false;
+ }
+ return bitSet.contains(entryId, entryId + 1);
+ }
+
+ void putIndexBit(long ledgerId, long entryId) {
+ delayedIndexBitMap.computeIfAbsent(ledgerId, k -> new
RoaringBitmap()).add(entryId, entryId + 1);
+ }
+
+ boolean removeIndexBit(long ledgerId, long entryId) {
+ boolean contained = false;
+ RoaringBitmap bitSet = delayedIndexBitMap.get(ledgerId);
+ if (bitSet != null && bitSet.contains(entryId, entryId + 1)) {
+ contained = true;
+ bitSet.remove(entryId, entryId + 1);
+
+ if (bitSet.isEmpty()) {
+ delayedIndexBitMap.remove(ledgerId);
+ }
+
+ if (numberBucketDelayedMessages > 0) {
+ numberBucketDelayedMessages--;
+ }
+ }
+ return contained;
+ }
+
+ String bucketKey() {
+ return String.join(DELIMITER, DELAYED_BUCKET_KEY_PREFIX,
String.valueOf(startLedgerId),
+ String.valueOf(endLedgerId));
+ }
+
+ Optional<CompletableFuture<Long>> getSnapshotCreateFuture() {
+ return Optional.ofNullable(snapshotCreateFuture);
+ }
+
+ Optional<Long> getBucketId() {
+ return Optional.ofNullable(bucketId);
+ }
+
+ long getAndUpdateBucketId() {
+ Optional<Long> bucketIdOptional = getBucketId();
+ if (bucketIdOptional.isPresent()) {
+ return bucketIdOptional.get();
+ }
+
+ String bucketIdStr = cursor.getCursorProperties().get(bucketKey());
+ long bucketId = Long.parseLong(bucketIdStr);
+ setBucketId(bucketId);
+ return bucketId;
+ }
+
+ CompletableFuture<Long> asyncSaveBucketSnapshot(
+ ImmutableBucket bucketState,
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments) {
+
+ return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments)
+ .thenCompose(newBucketId -> {
+ bucketState.setBucketId(newBucketId);
+ String bucketKey = bucketState.bucketKey();
+ return putBucketKeyId(bucketKey,
newBucketId).exceptionally(ex -> {
+ log.warn("Failed to record bucketId to cursor
property, bucketKey: {}", bucketKey);
+ return null;
+ }).thenApply(__ -> newBucketId);
+ });
+ }
+
+ private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long
bucketId) {
+ Objects.requireNonNull(bucketId);
+ return executeWithRetry(() -> cursor.putCursorProperty(bucketKey,
String.valueOf(bucketId)),
+ ManagedLedgerException.BadVersionException.class,
MaxRetryTimes);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
new file mode 100644
index 00000000000..b7f0e0a1bc1
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeRangeMap;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import java.time.Clock;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+
+@Slf4j
+@ThreadSafe
+public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker {
+
+ static final int AsyncOperationTimeoutSeconds = 30;
+
+ private final long minIndexCountPerBucket;
+
+ private final long timeStepPerBucketSnapshotSegment;
+
+ private final int maxNumBuckets;
+
+ private long numberDelayedMessages;
+
+ private final MutableBucket lastMutableBucket;
+
+ private final TripleLongPriorityQueue sharedBucketPriorityQueue;
+
+ private final RangeMap<Long, ImmutableBucket> immutableBuckets;
+
+ private final Table<Long, Long, ImmutableBucket>
snapshotSegmentLastIndexTable;
+
+ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
+ Timer timer, long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage bucketSnapshotStorage,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegment,
+ int maxNumBuckets) {
+ this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
+ bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegment, maxNumBuckets);
+ }
+
+ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
+ Timer timer, long tickTimeMillis, Clock clock,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage bucketSnapshotStorage,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegment,
+ int maxNumBuckets) {
+ super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
+ this.minIndexCountPerBucket = minIndexCountPerBucket;
+ this.timeStepPerBucketSnapshotSegment =
timeStepPerBucketSnapshotSegment;
+ this.maxNumBuckets = maxNumBuckets;
+ this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
+ this.immutableBuckets = TreeRangeMap.create();
+ this.snapshotSegmentLastIndexTable = HashBasedTable.create();
+ this.numberDelayedMessages = 0L;
+ ManagedCursor cursor = dispatcher.getCursor();
+ this.lastMutableBucket = new MutableBucket(cursor,
bucketSnapshotStorage);
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ synchronized (this) {
+ if (timeout == null || timeout.isCancelled()) {
+ return;
+ }
+
lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(),
sharedBucketPriorityQueue);
+ }
+ super.run(timeout);
+ }
+
+ private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
+ if (immutableBuckets.asMapOfRanges().isEmpty()) {
+ return Optional.empty();
+ }
+
+ return Optional.ofNullable(immutableBuckets.get(ledgerId));
+ }
+
+ private void sealBucket() {
+ Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
+
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+ this.sharedBucketPriorityQueue);
+ if (immutableBucketDelayedIndexPair != null) {
+ ImmutableBucket immutableBucket =
immutableBucketDelayedIndexPair.getLeft();
+ immutableBuckets.put(Range.closed(immutableBucket.startLedgerId,
immutableBucket.endLedgerId),
+ immutableBucket);
+
+ DelayedIndex lastDelayedIndex =
immutableBucketDelayedIndexPair.getRight();
+ snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(),
+ immutableBucket);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Create bucket snapshot, bucket: {}",
dispatcher.getName(),
+ lastMutableBucket);
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean addMessage(long ledgerId, long entryId, long
deliverAt) {
+ if (containsMessage(ledgerId, entryId)) {
+ return true;
+ }
+
+ if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+ return false;
+ }
+
+ boolean existBucket = findImmutableBucket(ledgerId).isPresent();
+
+ // Create bucket snapshot
+ if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
+ && lastMutableBucket.size() >= minIndexCountPerBucket
+ && !lastMutableBucket.isEmpty()) {
+ sealBucket();
+ lastMutableBucket.resetLastMutableBucketRange();
+
+ if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+ // TODO merge bucket snapshot (synchronize operate)
+ }
+ }
+
+ if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
+ // If (ledgerId < startLedgerId || existBucket) means that message
index belong to previous bucket range,
+ // enter sharedBucketPriorityQueue directly
+ sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+ } else {
+ checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
+ lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
+ }
+
+ numberDelayedMessages++;
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Add message {}:{} -- Delivery in {} ms ",
dispatcher.getName(), ledgerId, entryId,
+ deliverAt - clock.millis());
+ }
+
+ updateTimer();
+
+ return true;
+ }
+
+ @Override
+ public synchronized boolean hasMessageAvailable() {
+ long cutoffTime = getCutoffTime();
+
+ boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 &&
nextDeliveryTime() <= cutoffTime;
+ if (!hasMessageAvailable) {
+ updateTimer();
+ }
+ return hasMessageAvailable;
+ }
+
+ @Override
+ protected long nextDeliveryTime() {
+ if (lastMutableBucket.isEmpty() &&
!sharedBucketPriorityQueue.isEmpty()) {
+ return sharedBucketPriorityQueue.peekN1();
+ } else if (sharedBucketPriorityQueue.isEmpty() &&
!lastMutableBucket.isEmpty()) {
+ return lastMutableBucket.nextDeliveryTime();
+ }
+ long timestamp = lastMutableBucket.nextDeliveryTime();
+ long bucketTimestamp = sharedBucketPriorityQueue.peekN1();
+ return Math.min(timestamp, bucketTimestamp);
+ }
+
+ @Override
+ public synchronized long getNumberOfDelayedMessages() {
+ return numberDelayedMessages;
+ }
+
+ @Override
+ public synchronized long getBufferMemoryUsage() {
+ return this.lastMutableBucket.getBufferMemoryUsage() +
sharedBucketPriorityQueue.bytesCapacity();
+ }
+
+ @Override
+ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int
maxMessages) {
+ long cutoffTime = getCutoffTime();
+
+ lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime,
sharedBucketPriorityQueue);
+
+ NavigableSet<PositionImpl> positions = new TreeSet<>();
+ int n = maxMessages;
+
+ while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) {
+ long timestamp = sharedBucketPriorityQueue.peekN1();
+ if (timestamp > cutoffTime) {
+ break;
+ }
+
+ long ledgerId = sharedBucketPriorityQueue.peekN2();
+ long entryId = sharedBucketPriorityQueue.peekN3();
+ positions.add(new PositionImpl(ledgerId, entryId));
+
+ sharedBucketPriorityQueue.pop();
+ removeIndexBit(ledgerId, entryId);
+
+ ImmutableBucket bucket =
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
+ if (bucket != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Load next snapshot segment, bucket: {}",
dispatcher.getName(), bucket);
+ }
+ // All message of current snapshot segment are scheduled, load
next snapshot segment
+ // TODO make it asynchronous and not blocking this process
+ try {
+
bucket.asyncLoadNextBucketSnapshotEntry(false).thenAccept(indexList -> {
+ if (CollectionUtils.isEmpty(indexList)) {
+ return;
+ }
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
+ lastDelayedIndex =
indexList.get(indexList.size() - 1);
+
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
+ lastDelayedIndex.getEntryId(), bucket);
+ for
(DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
+
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
+ index.getEntryId());
+ }
+ }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ // TODO make this segment load again
+ throw new RuntimeException(e);
+ }
+ }
+
+ --n;
+ --numberDelayedMessages;
+ }
+
+ updateTimer();
+
+ return positions;
+ }
+
+ @Override
+ public boolean shouldPauseAllDeliveries() {
+ return false;
+ }
+
+ @Override
+ public synchronized void clear() {
+ cleanImmutableBuckets(true);
+ sharedBucketPriorityQueue.clear();
+ lastMutableBucket.clear();
+ snapshotSegmentLastIndexTable.clear();
+ numberDelayedMessages = 0;
+ }
+
+ @Override
+ public synchronized void close() {
+ super.close();
+ lastMutableBucket.close();
+ cleanImmutableBuckets(false);
+ sharedBucketPriorityQueue.close();
+ }
+
+ private void cleanImmutableBuckets(boolean delete) {
+ if (immutableBuckets != null) {
+ Iterator<ImmutableBucket> iterator =
immutableBuckets.asMapOfRanges().values().iterator();
+ while (iterator.hasNext()) {
+ ImmutableBucket bucket = iterator.next();
+ bucket.clear(delete);
+ iterator.remove();
+ }
+ }
+ }
+
+ private boolean removeIndexBit(long ledgerId, long entryId) {
+ if (lastMutableBucket.removeIndexBit(ledgerId, entryId)) {
+ return true;
+ }
+
+ return findImmutableBucket(ledgerId).map(bucket ->
bucket.removeIndexBit(ledgerId, entryId))
+ .orElse(false);
+ }
+
+ @Override
+ public boolean containsMessage(long ledgerId, long entryId) {
+ if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
+ return true;
+ }
+
+ return findImmutableBucket(ledgerId).map(bucket ->
bucket.containsMessage(ledgerId, entryId))
+ .orElse(false);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 7e5fa633dd9..3ab4ce1ad27 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.delayed;
+package org.apache.pulsar.broker.delayed.bucket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
new file mode 100644
index 00000000000..833030c5751
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.CollectionUtils;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+
+@Slf4j
+class ImmutableBucket extends Bucket {
+ ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long
startLedgerId, long endLedgerId) {
+ super(cursor, storage, startLedgerId, endLedgerId);
+ }
+
+ /**
+ * Asynchronous load next bucket snapshot entry.
+ * @param isRecover whether used to recover bucket snapshot
+ * @return CompletableFuture
+ */
+ CompletableFuture<List<DelayedIndex>>
asyncLoadNextBucketSnapshotEntry(boolean isRecover) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Load next bucket snapshot data, bucket: {}",
cursor.getName(), this);
+ }
+
+ // Wait bucket snapshot create finish
+ CompletableFuture<Void> snapshotCreateFuture =
+ getSnapshotCreateFuture().orElseGet(() ->
CompletableFuture.completedFuture(null))
+ .thenApply(__ -> null);
+
+ return snapshotCreateFuture.thenCompose(__ -> {
+ final long bucketId = getAndUpdateBucketId();
+ CompletableFuture<Integer> loadMetaDataFuture = new
CompletableFuture<>();
+ if (isRecover) {
+ // TODO Recover bucket snapshot
+ } else {
+ loadMetaDataFuture.complete(currentSegmentEntryId + 1);
+ }
+
+ return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
+ if (nextSegmentEntryId > lastSegmentEntryId) {
+ // TODO Delete bucket snapshot
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId,
nextSegmentEntryId)
+ .thenApply(bucketSnapshotSegments -> {
+ if
(CollectionUtils.isEmpty(bucketSnapshotSegments)) {
+ return Collections.emptyList();
+ }
+
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
+ bucketSnapshotSegments.get(0);
+
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
+ snapshotSegment.getIndexesList();
+ this.setCurrentSegmentEntryId(nextSegmentEntryId);
+ return indexList;
+ });
+ });
+ });
+ }
+
+ void clear(boolean delete) {
+ delayedIndexBitMap.clear();
+ getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
+ if (delete) {
+ snapshotGenerateFuture.cancel(true);
+ // TODO delete bucket snapshot
+ } else {
+ try {
+ snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.warn("Failed wait to snapshot generate, bucketId: {},
bucketKey: {}", getBucketId(),
+ bucketKey());
+ }
+ }
+ });
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
new file mode 100644
index 00000000000..36026298269
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.RoaringBitmap;
+
+@Slf4j
+class MutableBucket extends Bucket implements AutoCloseable {
+
+ private final TripleLongPriorityQueue priorityQueue;
+
+ MutableBucket(ManagedCursor cursor,
+ BucketSnapshotStorage bucketSnapshotStorage) {
+ super(cursor, bucketSnapshotStorage, -1L, -1L);
+ this.priorityQueue = new TripleLongPriorityQueue();
+ }
+
+ Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
+ long timeStepPerBucketSnapshotSegment,
+ TripleLongPriorityQueue sharedQueue) {
+ if (priorityQueue.isEmpty()) {
+ return null;
+ }
+ long numMessages = 0;
+
+ final long startLedgerId = getStartLedgerId();
+ final long endLedgerId = getEndLedgerId();
+
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
+ Map<Long, RoaringBitmap> bitMap = new HashMap<>();
+ SnapshotSegment.Builder snapshotSegmentBuilder =
SnapshotSegment.newBuilder();
+ SnapshotSegmentMetadata.Builder segmentMetadataBuilder =
SnapshotSegmentMetadata.newBuilder();
+
+ long currentTimestampUpperLimit = 0;
+ while (!priorityQueue.isEmpty()) {
+ long timestamp = priorityQueue.peekN1();
+ if (currentTimestampUpperLimit == 0) {
+ currentTimestampUpperLimit = timestamp +
timeStepPerBucketSnapshotSegment - 1;
+ }
+
+ long ledgerId = priorityQueue.peekN2();
+ long entryId = priorityQueue.peekN3();
+
+ checkArgument(ledgerId >= startLedgerId && ledgerId <=
endLedgerId);
+
+ // Move first segment of bucket snapshot to
sharedBucketPriorityQueue
+ if (segmentMetadataList.size() == 0) {
+ sharedQueue.add(timestamp, ledgerId, entryId);
+ }
+
+ priorityQueue.pop();
+ numMessages++;
+
+ DelayedIndex delayedIndex = DelayedIndex.newBuilder()
+ .setTimestamp(timestamp)
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId).build();
+
+ bitMap.computeIfAbsent(ledgerId, k -> new
RoaringBitmap()).add(entryId, entryId + 1);
+
+ snapshotSegmentBuilder.addIndexes(delayedIndex);
+
+ if (priorityQueue.isEmpty() || priorityQueue.peekN1() >
currentTimestampUpperLimit) {
+ segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
+ currentTimestampUpperLimit = 0;
+
+ Iterator<Map.Entry<Long, RoaringBitmap>> iterator =
bitMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RoaringBitmap> entry = iterator.next();
+ byte[] array = new
byte[entry.getValue().serializedSizeInBytes()];
+ entry.getValue().serialize(ByteBuffer.wrap(array));
+
segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(),
ByteString.copyFrom(array));
+ iterator.remove();
+ }
+
+ segmentMetadataList.add(segmentMetadataBuilder.build());
+ segmentMetadataBuilder.clear();
+
+ bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
+ snapshotSegmentBuilder.clear();
+ }
+ }
+
+ SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
+ .addAllMetadataList(segmentMetadataList)
+ .build();
+
+ final int lastSegmentEntryId = segmentMetadataList.size();
+
+ ImmutableBucket bucket = new ImmutableBucket(cursor,
bucketSnapshotStorage, startLedgerId, endLedgerId);
+ bucket.setCurrentSegmentEntryId(1);
+ bucket.setNumberBucketDelayedMessages(numMessages);
+ bucket.setLastSegmentEntryId(lastSegmentEntryId);
+
+ // Add the first snapshot segment last message to
snapshotSegmentLastMessageTable
+ checkArgument(!bucketSnapshotSegments.isEmpty());
+ SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
+ DelayedIndex lastDelayedIndex =
snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
+ Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket,
lastDelayedIndex);
+
+ CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
+ bucketSnapshotMetadata, bucketSnapshotSegments);
+ bucket.setSnapshotCreateFuture(future);
+ future.whenComplete((__, ex) -> {
+ if (ex == null) {
+ bucket.setSnapshotCreateFuture(null);
+ } else {
+ //TODO Record create snapshot failed
+ log.error("Failed to create snapshot: ", ex);
+ }
+ });
+
+ return result;
+ }
+
+ void moveScheduledMessageToSharedQueue(long cutoffTime,
TripleLongPriorityQueue sharedBucketPriorityQueue) {
+ while (!priorityQueue.isEmpty()) {
+ long timestamp = priorityQueue.peekN1();
+ if (timestamp > cutoffTime) {
+ break;
+ }
+
+ long ledgerId = priorityQueue.peekN2();
+ long entryId = priorityQueue.peekN3();
+ sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId);
+
+ priorityQueue.pop();
+ }
+ }
+
+ void resetLastMutableBucketRange() {
+ this.startLedgerId = -1L;
+ this.endLedgerId = -1L;
+ }
+
+ void clear() {
+ this.resetLastMutableBucketRange();
+ this.delayedIndexBitMap.clear();
+ }
+
+ public void close() {
+ priorityQueue.close();
+ }
+
+ long getBufferMemoryUsage() {
+ return priorityQueue.bytesCapacity();
+ }
+
+ boolean isEmpty() {
+ return priorityQueue.isEmpty();
+ }
+
+ long nextDeliveryTime() {
+ return priorityQueue.peekN1();
+ }
+
+ long size() {
+ return priorityQueue.size();
+ }
+
+ void addMessage(long ledgerId, long entryId, long deliverAt) {
+ priorityQueue.add(deliverAt, ledgerId, entryId);
+ if (startLedgerId == -1L) {
+ this.startLedgerId = ledgerId;
+ }
+ this.endLedgerId = ledgerId;
+ putIndexBit(ledgerId, entryId);
+ }
+}
diff --git
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
similarity index 59%
copy from
pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
index eda9a8f92e6..0ab45ecded2 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,27 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-syntax = "proto2";
-
-package pulsar.delay;
-option java_package = "org.apache.pulsar.broker.delayed.proto";
-option optimize_for = SPEED;
-
-message DelayedIndex {
- required uint64 timestamp = 1;
- required int64 ledger_id = 2;
- required int64 entry_id = 3;
-}
-
-message SnapshotSegmentMetadata {
- map<uint64, bytes> delayed_index_bit_map = 1;
- required uint64 max_schedule_timestamp = 2;
-}
-
-message SnapshotSegment {
- repeated DelayedIndex indexes = 1;
-}
-
-message SnapshotMetadata {
- repeated SnapshotSegmentMetadata metadata_list = 1;
-}
+package org.apache.pulsar.broker.delayed.bucket;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b777087ba2f..193ee07561d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1120,6 +1120,10 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return 0;
}
+ public ManagedCursor getCursor() {
+ return cursor;
+ }
+
protected int getStickyKeyHash(Entry entry) {
return
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}
diff --git
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index eda9a8f92e6..8414a583fe5 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -24,8 +24,8 @@ option optimize_for = SPEED;
message DelayedIndex {
required uint64 timestamp = 1;
- required int64 ledger_id = 2;
- required int64 entry_id = 3;
+ required uint64 ledger_id = 2;
+ required uint64 entry_id = 3;
}
message SnapshotSegmentMetadata {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
new file mode 100644
index 00000000000..1d166a8db5c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.Collections;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public abstract class AbstractDeliveryTrackerTest {
+
+ // Create a single shared timer for the test.
+ protected final Timer timer =
+ new HashedWheelTimer(new
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+ 500, TimeUnit.MILLISECONDS);
+ protected PersistentDispatcherMultipleConsumers dispatcher;
+ protected Clock clock;
+
+ protected AtomicLong clockTime;
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() {
+ timer.stop();
+ }
+
+ @Test(dataProvider = "delayedTracker")
+ public void test(DelayedDeliveryTracker tracker) throws Exception {
+ assertFalse(tracker.hasMessageAvailable());
+
+ assertTrue(tracker.addMessage(1, 2, 20));
+ assertTrue(tracker.addMessage(2, 1, 10));
+ assertTrue(tracker.addMessage(3, 3, 30));
+ assertTrue(tracker.addMessage(4, 5, 50));
+ assertTrue(tracker.addMessage(5, 4, 40));
+
+ assertFalse(tracker.hasMessageAvailable());
+ assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+
+ assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
+
+ // Move time forward
+ clockTime.set(15);
+
+ // Message is rejected by tracker since it's already ready to send
+ assertFalse(tracker.addMessage(6, 6, 10));
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+ assertTrue(tracker.hasMessageAvailable());
+ Set<PositionImpl> scheduled = tracker.getScheduledMessages(10);
+ assertEquals(scheduled.size(), 1);
+
+ // Move time forward
+ clockTime.set(60);
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 4);
+ assertTrue(tracker.hasMessageAvailable());
+ scheduled = tracker.getScheduledMessages(1);
+ assertEquals(scheduled.size(), 1);
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 3);
+ assertTrue(tracker.hasMessageAvailable());
+ scheduled = tracker.getScheduledMessages(3);
+ assertEquals(scheduled.size(), 3);
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+ assertFalse(tracker.hasMessageAvailable());
+ assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
+
+ tracker.close();
+ }
+
+ @Test(dataProvider = "delayedTracker")
+ public void testWithTimer(DelayedDeliveryTracker tracker,
NavigableMap<Long, TimerTask> tasks) throws Exception {
+ assertTrue(tasks.isEmpty());
+ assertTrue(tracker.addMessage(2, 2, 20));
+ assertEquals(tasks.size(), 1);
+ assertEquals(tasks.firstKey().longValue(), 20);
+
+ assertTrue(tracker.addMessage(1, 1, 10));
+ assertEquals(tasks.size(), 1);
+ assertEquals(tasks.firstKey().longValue(), 10);
+
+ assertTrue(tracker.addMessage(3, 3, 30));
+ assertEquals(tasks.size(), 1);
+ assertEquals(tasks.firstKey().longValue(), 10);
+
+ clockTime.set(15);
+
+ TimerTask task = tasks.pollFirstEntry().getValue();
+ Timeout cancelledTimeout = mock(Timeout.class);
+ when(cancelledTimeout.isCancelled()).thenReturn(true);
+ task.run(cancelledTimeout);
+ verify(dispatcher, atMostOnce()).readMoreEntries();
+
+ task.run(mock(Timeout.class));
+ verify(dispatcher).readMoreEntries();
+
+ tracker.close();
+ }
+
+ /**
+ * Adding a message that is about to expire within the tick time should
lead
+ * to a rejection from the tracker when
isDelayedDeliveryDeliverAtTimeStrict is false.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void testAddWithinTickTime(DelayedDeliveryTracker tracker) {
+ clockTime.set(0);
+
+ assertFalse(tracker.addMessage(1, 1, 10));
+ assertFalse(tracker.addMessage(2, 2, 99));
+ assertFalse(tracker.addMessage(3, 3, 100));
+ assertTrue(tracker.addMessage(4, 4, 101));
+ assertTrue(tracker.addMessage(5, 5, 200));
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 2);
+
+ tracker.close();
+ }
+
+ @Test(dataProvider = "delayedTracker")
+ public void testAddMessageWithStrictDelay(DelayedDeliveryTracker tracker) {
+ clockTime.set(10);
+
+ // Verify behavior for the less than, equal to, and greater than
deliverAt times.
+ assertFalse(tracker.addMessage(1, 1, 9));
+ assertFalse(tracker.addMessage(4, 4, 10));
+ assertTrue(tracker.addMessage(1, 1, 11));
+
+ assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+ assertFalse(tracker.hasMessageAvailable());
+
+ tracker.close();
+ }
+
+ /**
+ * In this test, the deliverAt time is after now, but the deliverAt time
is too early to run another tick, so the
+ * tickTimeMillis determines the delay.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker
tracker)
+ throws Exception {
+ // Set clock time, then run tracker to inherit clock time as the last
tick time.
+ clockTime.set(10000);
+ Timeout timeout = mock(Timeout.class);
+ when(timeout.isCancelled()).then(x -> false);
+ ((AbstractDelayedDeliveryTracker) tracker).run(timeout);
+ verify(dispatcher, times(1)).readMoreEntries();
+
+ // Add a message that has a delivery time just after the previous run.
It will get delivered based on the
+ // tick delay plus the last tick run.
+ assertTrue(tracker.addMessage(1, 1, 10001));
+
+ // Wait longer than the tick time plus the HashedWheelTimer's tick
time to ensure that enough time has
+ // passed where it would have been triggered if the tick time was
doing the triggering.
+ Thread.sleep(600);
+ verify(dispatcher, times(1)).readMoreEntries();
+
+ // Not wait for the message delivery to get triggered.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+ tracker.close();
+ }
+
+ /**
+ * In this test, the deliverAt time is after now, but before the
(tickTimeMillis + now). Because there wasn't a
+ * recent tick run, the deliverAt time determines the delay.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict(
+ DelayedDeliveryTracker tracker) {
+ clockTime.set(500000);
+
+ assertTrue(tracker.addMessage(1, 1, 500005));
+
+ // Wait long enough for the runnable to run, but not longer than the
tick time. The point is that the delivery
+ // should get scheduled early when the tick duration has passed since
the last tick.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+ tracker.close();
+ }
+
+ /**
+ * In this test, the deliverAt time is after now plus tickTimeMillis, so
the tickTimeMillis determines the delay.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker
tracker)
+ throws Exception {
+ clockTime.set(0);
+
+ assertTrue(tracker.addMessage(1, 1, 2000));
+
+ // Wait longer than the tick time plus the HashedWheelTimer's tick
time to ensure that enough time has
+ // passed where it would have been triggered if the tick time was
doing the triggering.
+ Thread.sleep(1000);
+
+ // Not wait for the message delivery to get triggered.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+
+ tracker.close();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
new file mode 100644
index 00000000000..331ceb83a99
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
+
+ private BucketSnapshotStorage bucketSnapshotStorage;
+
+ @AfterMethod
+ public void clean() throws Exception {
+ if (bucketSnapshotStorage != null) {
+ bucketSnapshotStorage.close();
+ }
+ }
+
+ @DataProvider(name = "delayedTracker")
+ public Object[][] provider(Method method) throws Exception {
+ dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+ clock = mock(Clock.class);
+ clockTime = new AtomicLong();
+ when(clock.millis()).then(x -> clockTime.get());
+
+ bucketSnapshotStorage = new MockBucketSnapshotStorage();
+ bucketSnapshotStorage.start();
+ ManagedCursor cursor = new MockManagedCursor("my_test_cursor");
+ doReturn(cursor).when(dispatcher).getCursor();
+
+ final String methodName = method.getName();
+ return switch (methodName) {
+ case "test" -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ case "testWithTimer" -> {
+ Timer timer = mock(Timer.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
+
+ when(timer.newTimeout(any(), anyLong(),
any())).then(invocation -> {
+ TimerTask task = invocation.getArgument(0,
TimerTask.class);
+ long timeout = invocation.getArgument(1, Long.class);
+ TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+ long scheduleAt = clockTime.get() + unit.toMillis(timeout);
+ tasks.put(scheduleAt, task);
+
+ Timeout t = mock(Timeout.class);
+ when(t.cancel()).then(i -> {
+ tasks.remove(scheduleAt, task);
+ return null;
+ });
+ return t;
+ });
+
+ yield new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50),
+ tasks
+ }};
+ }
+ case "testAddWithinTickTime" -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
+ false, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ case "testAddMessageWithStrictDelay" -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ case
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" ->
new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ case
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict",
"testRecoverSnapshot" ->
+ new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher,
timer, 100000, clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict",
"testExistDelayedMessage" ->
+ new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher,
timer, 500, clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), 50)
+ }};
+ default -> new Object[][]{{
+ new BucketDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ true, bucketSnapshotStorage, 1000,
TimeUnit.MILLISECONDS.toMillis(100), 50)
+ }};
+ };
+ }
+
+ @Test(dataProvider = "delayedTracker")
+ public void testContainsMessage(DelayedDeliveryTracker tracker) {
+ tracker.addMessage(1, 1, 10);
+ tracker.addMessage(2, 2, 20);
+
+ assertTrue(tracker.containsMessage(1, 1));
+ clockTime.set(20);
+
+ Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(1);
+
assertEquals(scheduledMessages.stream().findFirst().get().getEntryId(), 1);
+
+ tracker.addMessage(3, 3, 30);
+
+ tracker.addMessage(4, 4, 30);
+
+ tracker.addMessage(5, 5, 30);
+
+ tracker.addMessage(6, 6, 30);
+
+ assertTrue(tracker.containsMessage(3, 3));
+
+ tracker.close();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 1a98233c385..6711aed924c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -18,13 +18,9 @@
*/
package org.apache.pulsar.broker.delayed;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -35,297 +31,95 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Method;
import java.time.Clock;
-import java.util.Collections;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
-import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class InMemoryDeliveryTrackerTest {
+public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest {
- // Create a single shared timer for the test.
- private final Timer timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
- 500, TimeUnit.MILLISECONDS);
-
- @AfterClass(alwaysRun = true)
- public void cleanup() {
- timer.stop();
- }
-
- @Test
- public void test() throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
+ @DataProvider(name = "delayedTracker")
+ public Object[][] provider(Method method) throws Exception {
+ dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+ clock = mock(Clock.class);
+ clockTime = new AtomicLong();
when(clock.millis()).then(x -> clockTime.get());
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- false, 0);
-
- assertFalse(tracker.hasMessageAvailable());
-
- assertTrue(tracker.addMessage(2, 2, 20));
- assertTrue(tracker.addMessage(1, 1, 10));
- assertTrue(tracker.addMessage(3, 3, 30));
- assertTrue(tracker.addMessage(5, 5, 50));
- assertTrue(tracker.addMessage(4, 4, 40));
-
- assertFalse(tracker.hasMessageAvailable());
- assertEquals(tracker.getNumberOfDelayedMessages(), 5);
-
- assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
-
- // Move time forward
- clockTime.set(15);
-
- // Message is rejected by tracker since it's already ready to send
- assertFalse(tracker.addMessage(6, 6, 10));
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 5);
- assertTrue(tracker.hasMessageAvailable());
- Set<PositionImpl> scheduled = tracker.getScheduledMessages(10);
- assertEquals(scheduled.size(), 1);
-
- // Move time forward
- clockTime.set(60);
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 4);
- assertTrue(tracker.hasMessageAvailable());
- scheduled = tracker.getScheduledMessages(1);
- assertEquals(scheduled.size(), 1);
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 3);
- assertTrue(tracker.hasMessageAvailable());
- scheduled = tracker.getScheduledMessages(3);
- assertEquals(scheduled.size(), 3);
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 0);
- assertFalse(tracker.hasMessageAvailable());
- assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());
- }
-
- @Test
- public void testWithTimer() throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
- Timer timer = mock(Timer.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
-
- when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
- TimerTask task = invocation.getArgument(0, TimerTask.class);
- long timeout = invocation.getArgument(1, Long.class);
- TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
- long scheduleAt = clockTime.get() + unit.toMillis(timeout);
- tasks.put(scheduleAt, task);
-
- Timeout t = mock(Timeout.class);
- when(t.cancel()).then(i -> {
- tasks.remove(scheduleAt, task);
- return null;
- });
- return t;
- });
-
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- false, 0);
-
- assertTrue(tasks.isEmpty());
- assertTrue(tracker.addMessage(2, 2, 20));
- assertEquals(tasks.size(), 1);
- assertEquals(tasks.firstKey().longValue(), 20);
-
- assertTrue(tracker.addMessage(1, 1, 10));
- assertEquals(tasks.size(), 1);
- assertEquals(tasks.firstKey().longValue(), 10);
-
- assertTrue(tracker.addMessage(3, 3, 30));
- assertEquals(tasks.size(), 1);
- assertEquals(tasks.firstKey().longValue(), 10);
-
- clockTime.set(15);
-
- TimerTask task = tasks.pollFirstEntry().getValue();
- Timeout cancelledTimeout = mock(Timeout.class);
- when(cancelledTimeout.isCancelled()).thenReturn(true);
- task.run(cancelledTimeout);
- verifyZeroInteractions(dispatcher);
-
- task.run(mock(Timeout.class));
- verify(dispatcher).readMoreEntries();
- }
-
- /**
- * Adding a message that is about to expire within the tick time should
lead
- * to a rejection from the tracker when
isDelayedDeliveryDeliverAtTimeStrict is false.
- */
- @Test
- public void testAddWithinTickTime() {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
- false, 0);
-
- clockTime.set(0);
-
- assertFalse(tracker.addMessage(1, 1, 10));
- assertFalse(tracker.addMessage(2, 2, 99));
- assertFalse(tracker.addMessage(3, 3, 100));
- assertTrue(tracker.addMessage(4, 4, 101));
- assertTrue(tracker.addMessage(5, 5, 200));
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 2);
- }
-
- public void testAddMessageWithStrictDelay() {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
- true, 0);
-
- clockTime.set(10);
-
- // Verify behavior for the less than, equal to, and greater than
deliverAt times.
- assertFalse(tracker.addMessage(1, 1, 9));
- assertFalse(tracker.addMessage(4, 4, 10));
- assertTrue(tracker.addMessage(1, 1, 11));
-
- assertEquals(tracker.getNumberOfDelayedMessages(), 1);
- assertFalse(tracker.hasMessageAvailable());
- }
-
- /**
- * In this test, the deliverAt time is after now, but the deliverAt time
is too early to run another tick, so the
- * tickTimeMillis determines the delay.
- */
- public void
testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict()
throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- // Use a short tick time to show that the timer task is run based on
the deliverAt time in this scenario.
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 1000, clock, true, 0);
-
- // Set clock time, then run tracker to inherit clock time as the last
tick time.
- clockTime.set(10000);
- Timeout timeout = mock(Timeout.class);
- when(timeout.isCancelled()).then(x -> false);
- tracker.run(timeout);
- verify(dispatcher, times(1)).readMoreEntries();
-
- // Add a message that has a delivery time just after the previous run.
It will get delivered based on the
- // tick delay plus the last tick run.
- assertTrue(tracker.addMessage(1, 1, 10001));
-
- // Wait longer than the tick time plus the HashedWheelTimer's tick
time to ensure that enough time has
- // passed where it would have been triggered if the tick time was
doing the triggering.
- Thread.sleep(600);
- verify(dispatcher, times(1)).readMoreEntries();
-
- // Not wait for the message delivery to get triggered.
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> verify(dispatcher).readMoreEntries());
- }
-
- /**
- * In this test, the deliverAt time is after now, but before the
(tickTimeMillis + now). Because there wasn't a
- * recent tick run, the deliverAt time determines the delay.
- */
- public void
testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- // Use a large tick time to show that the message will get delivered
earlier because there wasn't
- // a previous tick run.
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 100000, clock, true, 0);
-
- clockTime.set(500000);
-
- assertTrue(tracker.addMessage(1, 1, 500005));
-
- // Wait long enough for the runnable to run, but not longer than the
tick time. The point is that the delivery
- // should get scheduled early when the tick duration has passed since
the last tick.
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> verify(dispatcher).readMoreEntries());
- }
-
- /**
- * In this test, the deliverAt time is after now plus tickTimeMillis, so
the tickTimeMillis determines the delay.
- */
- public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict()
throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- // Use a short tick time to show that the timer task is run based on
the deliverAt time in this scenario.
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 500, clock, true, 0);
-
- clockTime.set(0);
-
- assertTrue(tracker.addMessage(1, 1, 2000));
-
- // Wait longer than the tick time plus the HashedWheelTimer's tick
time to ensure that enough time has
- // passed where it would have been triggered if the tick time was
doing the triggering.
- Thread.sleep(1000);
- verifyNoInteractions(dispatcher);
-
- // Not wait for the message delivery to get triggered.
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+ final String methodName = method.getName();
+ return switch (methodName) {
+ case "test" -> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ false, 0)
+ }};
+ case "testWithTimer" -> {
+ Timer timer = mock(Timer.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
+
+ when(timer.newTimeout(any(), anyLong(),
any())).then(invocation -> {
+ TimerTask task = invocation.getArgument(0,
TimerTask.class);
+ long timeout = invocation.getArgument(1, Long.class);
+ TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+ long scheduleAt = clockTime.get() + unit.toMillis(timeout);
+ tasks.put(scheduleAt, task);
+
+ Timeout t = mock(Timeout.class);
+ when(t.cancel()).then(i -> {
+ tasks.remove(scheduleAt, task);
+ return null;
+ });
+ return t;
+ });
+
+ yield new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer,
1, clock,
+ false, 0),
+ tasks
+ }};
+ }
+ case "testAddWithinTickTime" -> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
+ false, 0)
+ }};
+ case "testAddMessageWithStrictDelay" -> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
+ true, 0)
+ }};
+ case
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" ->
new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer,
1000, clock,
+ true, 0)
+ }};
+ case
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" ->
new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
+ true, 0)
+ }};
+ case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict"
-> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500,
clock,
+ true, 0)
+ }};
+ case "testWithFixedDelays",
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500,
clock,
+ true, 100)
+ }};
+ default -> new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ true, 0)
+ }};
+ };
}
- @Test
- public void testWithFixedDelays() throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- final long fixedDelayLookahead = 100;
-
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true, fixedDelayLookahead);
-
+ @Test(dataProvider = "delayedTracker")
+ public void testWithFixedDelays(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
@@ -338,15 +132,16 @@ public class InMemoryDeliveryTrackerTest {
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= fixedDelayLookahead; i++) {
+ for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
- clockTime.set(fixedDelayLookahead * 10);
+ clockTime.set(tracker.getFixedDelayDetectionLookahead() * 10);
tracker.getScheduledMessages(100);
+
assertFalse(tracker.shouldPauseAllDeliveries());
// Empty the tracker
@@ -356,22 +151,12 @@ public class InMemoryDeliveryTrackerTest {
} while (removed > 0);
assertFalse(tracker.shouldPauseAllDeliveries());
- }
-
- @Test
- public void testWithMixedDelays() throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- long fixedDelayLookahead = 100;
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true, fixedDelayLookahead);
+ tracker.close();
+ }
+ @Test(dataProvider = "delayedTracker")
+ public void testWithMixedDelays(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
@@ -382,32 +167,22 @@ public class InMemoryDeliveryTrackerTest {
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= fixedDelayLookahead; i++) {
+ for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
// Add message with earlier delivery time
- assertTrue(tracker.addMessage(5, 5, 5));
+ assertTrue(tracker.addMessage(5, 6, 5));
assertFalse(tracker.shouldPauseAllDeliveries());
- }
-
- @Test
- public void testWithNoDelays() throws Exception {
- PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
-
- AtomicLong clockTime = new AtomicLong();
- Clock clock = mock(Clock.class);
- when(clock.millis()).then(x -> clockTime.get());
-
- long fixedDelayLookahead = 100;
- @Cleanup
- InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true, fixedDelayLookahead);
+ tracker.close();
+ }
+ @Test(dataProvider = "delayedTracker")
+ public void testWithNoDelays(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
assertFalse(tracker.hasMessageAvailable());
assertTrue(tracker.addMessage(1, 1, 10));
@@ -418,16 +193,18 @@ public class InMemoryDeliveryTrackerTest {
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= fixedDelayLookahead; i++) {
+ for (int i = 6; i <= tracker.getFixedDelayDetectionLookahead(); i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
// Add message with no-delay
- assertFalse(tracker.addMessage(5, 5, -1L));
+ assertFalse(tracker.addMessage(5, 6, -1L));
assertFalse(tracker.shouldPauseAllDeliveries());
+
+ tracker.close();
}
@Test
@@ -471,5 +248,4 @@ public class InMemoryDeliveryTrackerTest {
timer.stop();
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
new file mode 100644
index 00000000000..89831a1d5e7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@Slf4j
+public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
+
+ private final AtomicLong maxBucketId;
+
+ private final Map<Long, List<ByteBuf>> bucketSnapshots;
+
+ private final ExecutorService executorService =
+ new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new
LinkedBlockingQueue<>(),
+ new DefaultThreadFactory("bucket-snapshot-storage-io"));
+
+ public MockBucketSnapshotStorage() {
+ this.bucketSnapshots = new ConcurrentHashMap<>();
+ this.maxBucketId = new AtomicLong();
+ }
+
+ @Override
+ public CompletableFuture<Long> createBucketSnapshot(
+ SnapshotMetadata snapshotMetadata, List<SnapshotSegment>
bucketSnapshotSegments) {
+ return CompletableFuture.supplyAsync(() -> {
+ long bucketId = maxBucketId.getAndIncrement();
+ List<ByteBuf> entries = new ArrayList<>();
+ byte[] bytes = snapshotMetadata.toByteArray();
+ ByteBuf byteBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(bytes.length);
+ byteBuf.writeBytes(bytes);
+ entries.add(byteBuf);
+ this.bucketSnapshots.put(bucketId, entries);
+ return bucketId;
+ }, executorService).thenApply(bucketId -> {
+ List<ByteBuf> bufList = new ArrayList<>();
+ for (SnapshotSegment snapshotSegment : bucketSnapshotSegments) {
+ byte[] bytes = snapshotSegment.toByteArray();
+ ByteBuf byteBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(bytes.length);
+ byteBuf.writeBytes(bytes);
+ bufList.add(byteBuf);
+ }
+ bucketSnapshots.get(bucketId).addAll(bufList);
+
+ return bucketId;
+ });
+ }
+
+ @Override
+ public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long
bucketId) {
+ return CompletableFuture.supplyAsync(() -> {
+ ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0);
+ SnapshotMetadata snapshotMetadata;
+ try {
+ snapshotMetadata =
SnapshotMetadata.parseFrom(byteBuf.nioBuffer());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ return snapshotMetadata;
+ }, executorService);
+ }
+
+ @Override
+ public CompletableFuture<List<SnapshotSegment>>
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+
long lastSegmentEntryId) {
+ return CompletableFuture.supplyAsync(() -> {
+ List<SnapshotSegment> snapshotSegments = new ArrayList<>();
+ long lastEntryId = Math.min(lastSegmentEntryId,
this.bucketSnapshots.get(bucketId).size());
+ for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
+ ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
+ SnapshotSegment snapshotSegment;
+ try {
+ snapshotSegment =
SnapshotSegment.parseFrom(byteBuf.nioBuffer());
+ snapshotSegments.add(snapshotSegment);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return snapshotSegments;
+ }, executorService);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+ return CompletableFuture.supplyAsync(() -> {
+ List<ByteBuf> remove = this.bucketSnapshots.remove(bucketId);
+ if (remove != null) {
+ for (ByteBuf byteBuf : remove) {
+ byteBuf.release();
+ }
+ }
+ return null;
+ }, executorService);
+ }
+
+ @Override
+ public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+ return CompletableFuture.supplyAsync(() -> {
+ long length = 0;
+ List<ByteBuf> bufList = this.bucketSnapshots.get(bucketId);
+ for (ByteBuf byteBuf : bufList) {
+ length += byteBuf.array().length;
+ }
+ return length;
+ }, executorService);
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ clean();
+ }
+
+ public void clean() {
+ for (List<ByteBuf> value : bucketSnapshots.values()) {
+ for (ByteBuf byteBuf : value) {
+ byteBuf.release();
+ }
+ }
+ bucketSnapshots.clear();
+ executorService.shutdownNow();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
new file mode 100644
index 00000000000..efb0fa7ab7b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.common.collect.Range;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+public class MockManagedCursor implements ManagedCursor {
+
+ private final String name;
+
+ private final Map<String, String> cursorProperties;
+
+ public MockManagedCursor(String name) {
+ this.name = name;
+ this.cursorProperties = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public long getLastActive() {
+ return 0;
+ }
+
+ @Override
+ public void updateLastActive() {
+
+ }
+
+ @Override
+ public Map<String, Long> getProperties() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getCursorProperties() {
+ return this.cursorProperties;
+ }
+
+ @Override
+ public CompletableFuture<Void> putCursorProperty(String key, String value)
{
+ cursorProperties.put(key, value);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> setCursorProperties(Map<String, String>
cursorProperties) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public CompletableFuture<Void> removeCursorProperty(String key) {
+ cursorProperties.remove(key);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean putProperty(String key, Long value) {
+ return false;
+ }
+
+ @Override
+ public boolean removeProperty(String key) {
+ return false;
+ }
+
+ @Override
+ public List<Entry> readEntries(int numberOfEntriesToRead) throws
InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncReadEntries(int numberOfEntriesToRead,
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+ PositionImpl maxPosition) {
+
+ }
+
+ @Override
+ public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes,
+ AsyncCallbacks.ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
+
+ }
+
+ @Override
+ public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncGetNthEntry(int n, IndividualDeletedEntries
deletedEntries,
+ AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
+
+ }
+
+ @Override
+ public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncReadEntriesOrWait(int numberOfEntriesToRead,
AsyncCallbacks.ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition) {
+
+ }
+
+ @Override
+ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes,
AsyncCallbacks.ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition) {
+
+ }
+
+ @Override
+ public boolean cancelPendingReadRequest() {
+ return false;
+ }
+
+ @Override
+ public boolean hasMoreEntries() {
+ return false;
+ }
+
+ @Override
+ public long getNumberOfEntries() {
+ return 0;
+ }
+
+ @Override
+ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
+ return 0;
+ }
+
+ @Override
+ public void markDelete(Position position) throws InterruptedException,
ManagedLedgerException {
+
+ }
+
+ @Override
+ public void markDelete(Position position, Map<String, Long> properties)
+ throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncMarkDelete(Position position,
AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public void asyncMarkDelete(Position position, Map<String, Long>
properties,
+ AsyncCallbacks.MarkDeleteCallback callback,
Object ctx) {
+
+ }
+
+ @Override
+ public void delete(Position position) throws InterruptedException,
ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncDelete(Position position, AsyncCallbacks.DeleteCallback
callback, Object ctx) {
+
+ }
+
+ @Override
+ public void delete(Iterable<Position> positions) throws
InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncDelete(Iterable<Position> position,
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Position getReadPosition() {
+ return null;
+ }
+
+ @Override
+ public Position getMarkDeletedPosition() {
+ return null;
+ }
+
+ @Override
+ public Position getPersistentMarkDeletedPosition() {
+ return null;
+ }
+
+ @Override
+ public void rewind() {
+
+ }
+
+ @Override
+ public void seek(Position newReadPosition, boolean force) {
+
+ }
+
+ @Override
+ public void clearBacklog() throws InterruptedException,
ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncClearBacklog(AsyncCallbacks.ClearBacklogCallback
callback, Object ctx) {
+
+ }
+
+ @Override
+ public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries
deletedEntries)
+ throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncSkipEntries(int numEntriesToSkip,
IndividualDeletedEntries deletedEntries,
+ AsyncCallbacks.SkipEntriesCallback callback,
Object ctx) {
+
+ }
+
+ @Override
+ public Position findNewestMatching(java.util.function.Predicate<Entry>
condition)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public Position findNewestMatching(FindPositionConstraint constraint,
java.util.function.Predicate<Entry> condition)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
+ java.util.function.Predicate<Entry>
condition,
+ AsyncCallbacks.FindEntryCallback
callback, Object ctx) {
+
+ }
+
+ @Override
+ public void resetCursor(Position position) throws InterruptedException,
ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncResetCursor(Position position, boolean forceReset,
AsyncCallbacks.ResetCursorCallback callback) {
+
+ }
+
+ @Override
+ public List<Entry> replayEntries(Set<? extends Position> positions)
+ throws InterruptedException, ManagedLedgerException {
+ return null;
+ }
+
+ @Override
+ public Set<? extends Position> asyncReplayEntries(Set<? extends Position>
positions,
+
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+ return null;
+ }
+
+ @Override
+ public Set<? extends Position> asyncReplayEntries(Set<? extends Position>
positions,
+
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+ boolean sortEntries) {
+ return null;
+ }
+
+ @Override
+ public void close() throws InterruptedException, ManagedLedgerException {
+
+ }
+
+ @Override
+ public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
+
+ }
+
+ @Override
+ public Position getFirstPosition() {
+ return null;
+ }
+
+ @Override
+ public void setActive() {
+
+ }
+
+ @Override
+ public void setInactive() {
+
+ }
+
+ @Override
+ public void setAlwaysInactive() {
+
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public boolean isDurable() {
+ return false;
+ }
+
+ @Override
+ public long getNumberOfEntriesSinceFirstNotAckedMessage() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalNonContiguousDeletedMessagesRange() {
+ return 0;
+ }
+
+ @Override
+ public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+ return 0;
+ }
+
+ @Override
+ public long getEstimatedSizeSinceMarkDeletePosition() {
+ return 0;
+ }
+
+ @Override
+ public double getThrottleMarkDelete() {
+ return 0;
+ }
+
+ @Override
+ public void setThrottleMarkDelete(double throttleMarkDelete) {
+
+ }
+
+ @Override
+ public ManagedLedger getManagedLedger() {
+ return null;
+ }
+
+ @Override
+ public Range<PositionImpl> getLastIndividualDeletedRange() {
+ return null;
+ }
+
+ @Override
+ public void trimDeletedEntries(List<Entry> entries) {
+
+ }
+
+ @Override
+ public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
+ return new long[0];
+ }
+
+ @Override
+ public ManagedCursorMXBean getStats() {
+ return null;
+ }
+
+ @Override
+ public boolean checkAndUpdateReadPositionChanged() {
+ return false;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+}