This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 027f4e94b4a [fix][broker] Fix delayed messages stalling with
isDelayedDeliveryDeliverAtTimeStrict=true (#26012)
027f4e94b4a is described below
commit 027f4e94b4a2e975c4aae2d6111bf31d201dc8b1
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jun 13 07:01:22 2026 +0800
[fix][broker] Fix delayed messages stalling with
isDelayedDeliveryDeliverAtTimeStrict=true (#26012)
Co-authored-by: Lari Hotari <[email protected]>
---
.../delayed/AbstractDelayedDeliveryTracker.java | 99 ++++++++---
.../delayed/InMemoryDelayedDeliveryTracker.java | 53 ++++--
.../bucket/BucketDelayedDeliveryTracker.java | 7 +-
.../delayed/InMemoryDeliveryTrackerTest.java | 186 ++++++++++++++++++++-
4 files changed, 302 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
index 2caf71a6eda..5140e9866bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -36,21 +36,26 @@ public abstract class AbstractDelayedDeliveryTracker
implements DelayedDeliveryT
// Reference to the shared (per-broker) timer for delayed delivery
protected final Timer timer;
- // Current timeout or null if not set
- protected Timeout timeout;
+ // Current timeout or null if not set. Guarded by timeoutLock.
+ private Timeout timeout;
- // Timestamp at which the timeout is currently set
+ // Timestamp at which the timeout is currently set. Guarded by timeoutLock.
private long currentTimeoutTarget;
- // Last time the TimerTask was triggered for this class
+ // Last time the TimerTask was triggered for this class. Guarded by
timeoutLock.
private long lastTickRun;
- protected long tickTimeMillis;
+ // Updated through resetTickTime() from dispatcher threads and read on the
timer thread.
+ protected volatile long tickTimeMillis;
protected final Clock clock;
private final boolean isDelayedDeliveryDeliverAtTimeStrict;
private final Object triggerLock;
+ // Guards the timer state (timeout, currentTimeoutTarget, lastTickRun)
against concurrent access from
+ // dispatcher threads (updateTimer/rescheduleTimer/close) and the timer
thread (run). It is a leaf lock:
+ // no subclass method is invoked while holding it.
+ private final Object timeoutLock = new Object();
public
AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
long tickTimeMillis,
@@ -99,14 +104,34 @@ public abstract class AbstractDelayedDeliveryTracker
implements DelayedDeliveryT
return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() :
clock.millis() + tickTimeMillis;
}
+ protected boolean isDeliverAtTimeStrict() {
+ return isDelayedDeliveryDeliverAtTimeStrict;
+ }
+
public void resetTickTime(long tickTime) {
if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}
- protected void updateTimer() {
- if (getNumberOfDelayedMessages() == 0) {
+ /**
+ * Update the delivery timer to fire when the next message in the tracker
becomes due.
+ *
+ * Callers are expected to serialize all tracker state mutations (at the
dispatcher or tracker level), so the
+ * snapshot of {@link #getNumberOfDelayedMessages()} and {@link
#nextDeliveryTime()} is taken before acquiring
+ * timeoutLock. This keeps timeoutLock a leaf lock that never calls into
subclass methods, ruling out lock
+ * ordering deadlocks with subclasses that synchronize those methods on
the tracker instance.
+ */
+ protected final void updateTimer() {
+ long numberOfDelayedMessages = getNumberOfDelayedMessages();
+ long nextDeliveryTimestamp = numberOfDelayedMessages > 0 ?
nextDeliveryTime() : -1;
+ synchronized (timeoutLock) {
+ doUpdateTimer(numberOfDelayedMessages, nextDeliveryTimestamp);
+ }
+ }
+
+ private void doUpdateTimer(long numberOfDelayedMessages, long timestamp) {
+ if (numberOfDelayedMessages == 0) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
@@ -114,7 +139,6 @@ public abstract class AbstractDelayedDeliveryTracker
implements DelayedDeliveryT
}
return;
}
- long timestamp = nextDeliveryTime();
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
@@ -122,52 +146,83 @@ public abstract class AbstractDelayedDeliveryTracker
implements DelayedDeliveryT
if (timeout != null) {
timeout.cancel();
+ timeout = null;
}
+ // Reset the tracked state so a subsequent updateTimer() call cannot
short-circuit on a stale
+ // currentTimeoutTarget while no live timer remains. See #25996.
+ currentTimeoutTarget = -1;
long now = clock.millis();
long delayMillis = timestamp - now;
- if (delayMillis < 0) {
+ if (delayMillis <= 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
- // get these messages
+ // get these messages.
return;
}
// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis,
remainingTickDelayMillis);
- log.debug().attr("delayMillis", calculatedDelayMillis)
- .log("Start timer");
- // Even though we may delay longer than this timestamp because
of the tick delay, we still track the
+ log.debug().attr("delayMillis", calculatedDelayMillis)
+ .log("Start timer");
+ // Even though we may delay longer than this timestamp because of the
tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis,
TimeUnit.MILLISECONDS);
}
@Override
- public void run(Timeout timeout) throws Exception {
- log.debug("Timer triggered");
- if (timeout == null || timeout.isCancelled()) {
+ public void run(Timeout triggeredTimeout) throws Exception {
+ log.debug("Timer triggered");
+
+ if (triggeredTimeout == null || triggeredTimeout.isCancelled()) {
return;
}
- synchronized (triggerLock) {
+ synchronized (timeoutLock) {
lastTickRun = clock.millis();
- currentTimeoutTarget = -1;
- this.timeout = null;
+ // Only reset the timer state if the triggered timeout is the
currently armed one. A timeout that
+ // was already superseded by updateTimer()/rescheduleTimer() may
still fire if it passed its
+ // isCancelled() check before being cancelled; it must not clear
the state of the newer timer.
+ if (triggeredTimeout == this.timeout) {
+ currentTimeoutTarget = -1;
+ this.timeout = null;
+ }
+ }
+
+ synchronized (triggerLock) {
context.triggerReadMoreEntries();
}
}
+ /**
+ * Cancel the current timer (if any) and schedule the timer task to run
after the given delay. Used by
+ * subclasses to trigger a dispatch round from asynchronous completions
instead of mutating the timer
+ * state directly.
+ */
+ protected final void rescheduleTimer(long delayMillis) {
+ synchronized (timeoutLock) {
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ currentTimeoutTarget = -1;
+ timeout = timer.newTimeout(this, delayMillis,
TimeUnit.MILLISECONDS);
+ }
+ }
+
@Override
public void close() {
- if (timeout != null) {
- timeout.cancel();
- timeout = null;
+ synchronized (timeoutLock) {
+ if (timeout != null) {
+ timeout.cancel();
+ timeout = null;
+ }
+ currentTimeoutTarget = -1;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index b1d4e8cecbd..8e9b8343704 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -62,6 +62,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
// The bit count to trim to reduce memory occupation.
private final int timestampPrecisionBitCnt;
+ private final long precisionMillis;
// Count of delayed messages in the tracker.
private final AtomicLong delayedMessagesCount = new AtomicLong(0);
@@ -91,6 +92,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
this.log = LOG.with().ctx(super.log).build();
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+ this.precisionMillis = 1L << timestampPrecisionBitCnt;
}
/**
@@ -133,7 +135,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
.attr("entryId", entryId)
.attr("deliveryInMs", () -> deliverAt - clock.millis())
.log("Add message");
- long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+ long timestamp = roundTimestamp(deliverAt);
Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp,
k -> new TreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
@@ -142,7 +144,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
boolean isNew = !bitmap.contains(entryId);
if (isNew) {
- bitmap.add(entryId);
+ bitmap.addLong(entryId);
delayedMessagesCount.incrementAndGet();
}
@@ -153,6 +155,27 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
return true;
}
+ /**
+ * Round the deliverAt timestamp to the bucket boundary used as the key in
{@link #delayedMessageMap}, so that
+ * all messages within the same bucket share a single map entry to reduce
memory usage.
+ *
+ * In strict delivery mode the timestamp is rounded up: a bucket then
becomes due only after every deliverAt
+ * time inside it has passed, so messages are delivered up to one bucket
(less than tickTimeMillis) late, but
+ * never before their deliverAt time. Rounding down instead would let
{@link #getScheduledMessages(int)} hand a
+ * message to the dispatcher before its deliverAt time; the dispatcher
would put it back and re-trigger reads
+ * in a loop until the deliverAt time is reached (see issue #25996).
+ *
+ * In non-strict mode the timestamp is rounded down, since delivering up
to tickTimeMillis early is allowed.
+ */
+ private long roundTimestamp(long deliverAt) {
+ if (isDeliverAtTimeStrict()) {
+ // round up, saturating at Long.MAX_VALUE instead of overflowing
for deliverAt close to Long.MAX_VALUE
+ long roundedUp = deliverAt + precisionMillis - 1;
+ return trimLowerBit(roundedUp < deliverAt ? Long.MAX_VALUE :
roundedUp, timestampPrecisionBitCnt);
+ }
+ return trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+ }
+
/**
* Check that new delivery time comes after the current highest, or at
* least within a single tick time interval of 1 second.
@@ -198,20 +221,22 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
for (var ledgerEntry : ledgerMap.entrySet()) {
long ledgerId = ledgerEntry.getKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
- int cardinality = (int) entryIds.getLongCardinality();
+ long cardinality = entryIds.getLongCardinality();
if (cardinality <= n) {
+ int cardinalityInt = (int) cardinality;
entryIds.forEach(entryId -> {
positions.add(PositionFactory.create(ledgerId,
entryId));
});
- n -= cardinality;
- delayedMessagesCount.addAndGet(-cardinality);
+ n -= cardinalityInt;
+ delayedMessagesCount.addAndGet(-cardinalityInt);
ledgerIdToDelete.add(ledgerId);
} else {
- long[] entryIdsArray = entryIds.toArray();
- for (int i = 0; i < n; i++) {
- positions.add(PositionFactory.create(ledgerId,
entryIdsArray[i]));
- entryIds.removeLong(entryIdsArray[i]);
- }
+ Roaring64Bitmap entryIdsToRemove = new Roaring64Bitmap();
+ entryIds.stream().limit(n).forEach(entryId -> {
+ positions.add(PositionFactory.create(ledgerId,
entryId));
+ entryIdsToRemove.addLong(entryId);
+ });
+ entryIds.andNot(entryIdsToRemove);
delayedMessagesCount.addAndGet(-n);
n = 0;
}
@@ -226,10 +251,10 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
delayedMessageMap.remove(timestamp);
}
}
- log.debug()
- .attr("messagesCount", positions.size())
- .log("Get scheduled messages");
- if (delayedMessageMap.isEmpty()) {
+ log.debug()
+ .attr("messagesCount", positions.size())
+ .log("Get scheduled messages");
+ if (delayedMessageMap.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 19c774852c9..0b0b48b8de3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -712,12 +712,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
- synchronized (this) {
- if (timeout != null) {
- timeout.cancel();
- }
- timeout = timer.newTimeout(this, 0,
TimeUnit.MILLISECONDS);
- }
+ rescheduleTimer(0);
});
if (!checkPendingLoadDone() ||
loadFuture.isCompletedExceptionally()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 322992d7b1c..f8a9dd83042 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.delayed;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -33,13 +35,16 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.time.Clock;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -114,6 +119,39 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8,
clock,
true, 100)
}};
+ case "testStrictModeNeverDeliversEarlyAndKeepsTimerArmed",
+ "testStaleTimerTriggerDoesNotClearNewerTimer" -> {
+ // Mock timer that records the currently-armed timeouts so the
test can observe whether a
+ // delivery timer is live and fire it like the wheel would
(passing the armed Timeout instance,
+ // which the tracker's run() compares against its current
timeout). Cancelling a timeout removes
+ // it from the map; firing (polling) it does not mark it
cancelled, mirroring the wheel.
+ Timer mockTimer = mock(Timer.class);
+ NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks = new
TreeMap<>();
+ when(mockTimer.newTimeout(any(), anyLong(),
any())).then(invocation -> {
+ TimerTask task = invocation.getArgument(0,
TimerTask.class);
+ long delay = invocation.getArgument(1, Long.class);
+ TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+ long scheduleAt = clockTime.get() + unit.toMillis(delay);
+ Timeout t = mock(Timeout.class);
+ Map.Entry<TimerTask, Timeout> entry = Map.entry(task, t);
+ AtomicBoolean cancelled = new AtomicBoolean();
+ when(t.cancel()).then(i -> {
+ cancelled.set(true);
+ return tasks.remove(scheduleAt, entry);
+ });
+ when(t.isCancelled()).then(i -> cancelled.get());
+ tasks.put(scheduleAt, entry);
+ return t;
+ });
+ // tickTimeMillis=1000 -> delivery timestamps are bucketed at
512ms granularity (lower 9 bits),
+ // rounded up in strict mode so that messages are never
visible before their deliverAt time.
+ yield new Object[][]{{
+ new InMemoryDelayedDeliveryTracker(dispatcher,
mockTimer, 1000, clock,
+ true, 0),
+ tasks,
+ mockTimer
+ }};
+ }
default -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
true, 0)
@@ -229,7 +267,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
true, 0) {
@Override
public void run(Timeout timeout) throws Exception {
- super.timeout = timer.newTimeout(this, 1,
TimeUnit.MILLISECONDS);
+ rescheduleTimer(1);
if (timeout == null || timeout.isCancelled()) {
return;
}
@@ -274,6 +312,106 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
tracker.close();
}
+ /**
+ * Regression test for https://github.com/apache/pulsar/issues/25996 and
for the strict-mode guarantee that
+ * messages are never delivered before their deliverAt time.
+ *
+ * With isDelayedDeliveryDeliverAtTimeStrict=true and tickTimeMillis=1000,
delivery timestamps are bucketed
+ * at 512ms granularity and rounded UP, so a bucket only becomes due once
every deliverAt time inside it has
+ * passed. Previously timestamps were rounded down, so a message could be
popped up to ~511ms early; the
+ * dispatcher would re-add the not-yet-due message and the re-add left a
stale {@code currentTimeoutTarget}
+ * behind that suppressed re-arming the delivery timer, stalling all
remaining delayed messages until an
+ * unrelated dispatch event occurred.
+ *
+ * The rounded-up buckets (multiples of 512ms) used below:
+ * M1 deliverAt=60400 -> bucket 60416
+ * M2 deliverAt=61000 -> bucket 61440
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testStrictModeNeverDeliversEarlyAndKeepsTimerArmed(InMemoryDelayedDeliveryTracker
tracker,
+ NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks, Timer
mockTimer) throws Exception {
+ clockTime.set(0);
+
+ // Two delayed messages in different buckets. A delivery timer is
armed for the earliest.
+ assertTrue(tracker.addMessage(1, 1, 60400));
+ assertTrue(tracker.addMessage(2, 2, 61000));
+ assertEquals(tasks.size(), 1, "a delivery timer should be armed for
the earliest message");
+ assertEquals(tasks.firstKey().longValue(), 60416, "the timer should
target M1's rounded-up bucket");
+
+ // Before M1's bucket time, nothing may be visible to the dispatcher
(no early delivery), and a
+ // dispatch round that finds nothing must leave the delivery timer
armed (issue #25996).
+ clockTime.set(60000);
+ assertFalse(tracker.hasMessageAvailable());
+ assertTrue(tracker.getScheduledMessages(100).isEmpty(),
+ "strict mode must not deliver a message before its deliverAt
time");
+ assertEquals(tasks.size(), 1, "the delivery timer must remain armed");
+
+ // The timer fires at M1's bucket time; M1 is delivered at 60416 >=
deliverAt 60400, so the
+ // dispatcher never needs to re-add it.
+ clockTime.set(60416);
+ Map.Entry<TimerTask, Timeout> firedTimeout =
tasks.pollFirstEntry().getValue();
+ firedTimeout.getKey().run(firedTimeout.getValue());
+ Set<Position> scheduled = tracker.getScheduledMessages(100);
+ assertEquals(scheduled, Set.of(PositionFactory.create(1, 1)));
+
+ // M2 is still pending and not yet due, so a delivery timer must have
been re-armed for it. With the
+ // issue #25996 bug, the timer state went stale at this point and M2
stalled indefinitely.
+ assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+ assertFalse(tracker.hasMessageAvailable());
+ assertEquals(tasks.size(), 1, "a delivery timer must remain armed for
the pending message M2");
+ assertEquals(tasks.firstKey().longValue(), 61440, "the timer should
target M2's rounded-up bucket");
+
+ // The timer fires again and M2 is delivered, also never early.
+ clockTime.set(61440);
+ firedTimeout = tasks.pollFirstEntry().getValue();
+ firedTimeout.getKey().run(firedTimeout.getValue());
+ scheduled = tracker.getScheduledMessages(100);
+ assertEquals(scheduled, Set.of(PositionFactory.create(2, 2)));
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+
+ tracker.close();
+ }
+
+ /**
+ * A timeout that was superseded by a newer one may still fire:
HashedWheelTimer can run a task that passed
+ * its isCancelled() check just before updateTimer() cancelled it. Such a
stale trigger must not clear the
+ * state of the newer armed timer, otherwise the next updateTimer() call
would arm a duplicate timer.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testStaleTimerTriggerDoesNotClearNewerTimer(InMemoryDelayedDeliveryTracker
tracker,
+ NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks, Timer
mockTimer) throws Exception {
+ clockTime.set(0);
+
+ // Arm a timer for M2, then supersede it with an earlier message M1.
+ assertTrue(tracker.addMessage(2, 2, 61000));
+ assertEquals(tasks.firstKey().longValue(), 61440);
+ assertTrue(tracker.addMessage(1, 1, 60400));
+ assertEquals(tasks.size(), 1);
+ assertEquals(tasks.firstKey().longValue(), 60416, "M1's timer should
have replaced M2's");
+
+ // The superseded (cancelled) timeout for M2 fires anyway, racing with
the cancellation. The tracker
+ // must keep the state of the currently armed timer for M1.
+ Timeout staleTimeout = mock(Timeout.class);
+ tracker.run(staleTimeout);
+
+ // A subsequent updateTimer() (here through hasMessageAvailable())
must recognize the armed timer
+ // instead of arming a duplicate one: still exactly the two
newTimeout() calls from the adds above.
+ assertFalse(tracker.hasMessageAvailable());
+ assertEquals(tasks.size(), 1);
+ assertEquals(tasks.firstKey().longValue(), 60416);
+ verify(mockTimer, times(2)).newTimeout(any(), anyLong(), any());
+
+ // The armed timer fires and delivery proceeds normally.
+ clockTime.set(60416);
+ Map.Entry<TimerTask, Timeout> firedTimeout =
tasks.pollFirstEntry().getValue();
+ firedTimeout.getKey().run(firedTimeout.getValue());
+ Set<Position> scheduled = tracker.getScheduledMessages(100);
+ assertEquals(scheduled, Set.of(PositionFactory.create(1, 1)));
+ assertEquals(tasks.size(), 1, "a delivery timer must be re-armed for
the still pending M2");
+
+ tracker.close();
+ }
+
@Test(dataProvider = "delayedTracker")
public void
testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
tracker.addMessage(1, 1, 50);
@@ -284,4 +422,50 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
tracker.getScheduledMessages(10);
}
+
+ /**
+ * Covers the partial drain of a per-ledger entry id bitmap in
getScheduledMessages, where the bitmap holds
+ * more entries than the remaining maxMessages budget (the cardinality > n
branch): only the lowest n entry
+ * ids may be returned and the rest must stay tracked, without duplicates
across calls.
+ */
+ @Test(dataProvider = "delayedTracker")
+ public void
testGetScheduledMessagesWithMaxMessagesSmallerThanBucket(InMemoryDelayedDeliveryTracker
tracker)
+ throws Exception {
+ clockTime.set(0);
+
+ // Two ledgers within the same delivery bucket: ledger 1 with 2
entries, ledger 2 with 5 entries.
+ assertTrue(tracker.addMessage(1, 0, 10));
+ assertTrue(tracker.addMessage(1, 1, 10));
+ for (int entryId = 0; entryId < 5; entryId++) {
+ assertTrue(tracker.addMessage(2, entryId, 10));
+ }
+ assertEquals(tracker.getNumberOfDelayedMessages(), 7);
+
+ clockTime.set(10);
+
+ // maxMessages drains ledger 1 fully and ledger 2 partially
(cardinality > n on ledger 2's bitmap).
+ Set<Position> scheduled = tracker.getScheduledMessages(4);
+ assertEquals(scheduled, Set.of(
+ PositionFactory.create(1, 0),
+ PositionFactory.create(1, 1),
+ PositionFactory.create(2, 0),
+ PositionFactory.create(2, 1)));
+ assertEquals(tracker.getNumberOfDelayedMessages(), 3);
+
+ // Another partial drain of ledger 2's remaining entries (cardinality
> n again): continues with the
+ // next lowest entry ids, no duplicates from the previous call.
+ scheduled = tracker.getScheduledMessages(2);
+ assertEquals(scheduled, Set.of(
+ PositionFactory.create(2, 2),
+ PositionFactory.create(2, 3)));
+ assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+
+ // The last remaining entry is returned and the tracker is emptied.
+ scheduled = tracker.getScheduledMessages(10);
+ assertEquals(scheduled, Set.of(PositionFactory.create(2, 4)));
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+ assertFalse(tracker.hasMessageAvailable());
+
+ tracker.close();
+ }
}