This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c48a3243287 Avoid tracking the delays of all the message when we
detect that they are fixed (#16609)
c48a3243287 is described below
commit c48a3243287c7d775459b6437d9f4b24ed44cf4c
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 15 09:01:41 2022 -0700
Avoid tracking the delays of all the message when we detect that they are
fixed (#16609)
* Avoid tracking the delays of all the message when we detect that they are
fixed
* Use tick time to avoid clock skews across different producers
---
.../broker/delayed/DelayedDeliveryTracker.java | 5 +
.../delayed/InMemoryDelayedDeliveryTracker.java | 47 ++++++++-
.../broker/service/AbstractBaseDispatcher.java | 3 +-
.../PersistentDispatcherMultipleConsumers.java | 17 +++-
.../delayed/InMemoryDeliveryTrackerTest.java | 113 +++++++++++++++++++++
5 files changed, 179 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 2fbd9a51d4a..35853d3599b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -55,6 +55,11 @@ public interface DelayedDeliveryTracker extends
AutoCloseable {
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);
+ /**
+ * Tells whether the dispatcher should pause any message deliveries, until
the DelayedDeliveryTracker has
+ * more messages available.
+ */
+ boolean shouldPauseAllDeliveries();
/**
* Reset tick time use zk policies cache.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 92df563dad4..837d3d1872c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -55,6 +55,20 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ // If we detect that all messages have fixed delay time, such that the
delivery is
+ // always going to be in FIFO order, then we can avoid pulling all the
messages in
+ // tracker. Instead, we use the lookahead for detection and pause the read
from
+ // the cursor if the delays are fixed.
+ public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+
+ // This is the timestamp of the message with the highest delivery time
+ // If new added messages are lower than this, it means the delivery is
requested
+ // to be out-of-order. It gets reset to 0, once the tracker is emptied.
+ private long highestDeliveryTimeTracked = 0;
+
+ // Track whether we have seen all messages with fixed delay so far.
+ private boolean messagesHaveFixedDelay = true;
+
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher, Timer timer, long tickTimeMillis,
boolean
isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict);
@@ -86,16 +100,28 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
@Override
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+ if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+ messagesHaveFixedDelay = false;
+ return false;
+ }
+
if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ",
dispatcher.getName(), ledgerId, entryId,
deliverAt - clock.millis());
}
- if (deliverAt <= getCutoffTime()) {
- return false;
- }
+
priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();
+
+ // Check that new delivery time comes after the current highest, or at
+ // least within a single tick time interval of 1 second.
+ if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+ messagesHaveFixedDelay = false;
+ }
+
+ highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked,
deliverAt);
+
return true;
}
@@ -137,6 +163,13 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messages - found {}",
dispatcher.getName(), positions.size());
}
+
+ if (priorityQueue.isEmpty()) {
+ // Reset to initial state
+ highestDeliveryTimeTracked = 0;
+ messagesHaveFixedDelay = true;
+ }
+
updateTimer();
return positions;
}
@@ -241,4 +274,12 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
timeout.cancel();
}
}
+
+ @Override
+ public boolean shouldPauseAllDeliveries() {
+ // Pause deliveries if we know all delays are fixed within the
lookahead window
+ return messagesHaveFixedDelay
+ && priorityQueue.size() >=
DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+ && !hasMessageAvailable();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 0c7a6641216..d9f36bf0643 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -177,8 +177,7 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
entry.release();
individualAcknowledgeMessageIfNeeded(pos,
Collections.emptyMap());
continue;
- } else if (msgMetadata.hasDeliverAtTime()
- && trackDelayedDelivery(entry.getLedgerId(),
entry.getEntryId(), msgMetadata)) {
+ } else if (trackDelayedDelivery(entry.getLedgerId(),
entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6af58557a83..a770e76fc43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -236,6 +236,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
public synchronized void readMoreEntries() {
+ if (shouldPauseDeliveryForDelayTracker()) {
+ return;
+ }
+
// totalAvailablePermits may be updated by other threads
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits,
firstAvailableConsumerPermits);
@@ -874,13 +878,20 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
synchronized (this) {
if (!delayedDeliveryTracker.isPresent()) {
+ if (!msgMetadata.hasDeliverAtTime()) {
+ // No need to initialize the tracker here
+ return false;
+ }
+
// Initialize the tracker the first time we need to use it
delayedDeliveryTracker = Optional
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
- return delayedDeliveryTracker.get().addMessage(ledgerId, entryId,
msgMetadata.getDeliverAtTime());
+
+ long deliverAtTime = msgMetadata.hasDeliverAtTime() ?
msgMetadata.getDeliverAtTime() : -1L;
+ return delayedDeliveryTracker.get().addMessage(ledgerId, entryId,
deliverAtTime);
}
}
@@ -895,6 +906,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
}
+ protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
+ return delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().shouldPauseAllDeliveries();
+ }
+
@Override
public synchronized long getNumberOfDelayedMessages() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index f44f61a67f9..db2db6cc1db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -314,4 +314,117 @@ public class InMemoryDeliveryTrackerTest {
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> verify(dispatcher).readMoreEntries());
}
+
+ @Test
+ public void testWithFixedDelays() throws Exception {
+ PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ true);
+
+ assertFalse(tracker.hasMessageAvailable());
+
+ assertTrue(tracker.addMessage(1, 1, 10));
+ assertTrue(tracker.addMessage(2, 2, 20));
+ assertTrue(tracker.addMessage(3, 3, 30));
+ assertTrue(tracker.addMessage(4, 4, 40));
+ assertTrue(tracker.addMessage(5, 5, 50));
+
+ assertFalse(tracker.hasMessageAvailable());
+ assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+ assertFalse(tracker.shouldPauseAllDeliveries());
+
+ for (int i = 6; i <=
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ assertTrue(tracker.addMessage(i, i, i * 10));
+ }
+
+ assertTrue(tracker.shouldPauseAllDeliveries());
+
+
clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
* 10);
+
+ tracker.getScheduledMessages(100);
+ assertFalse(tracker.shouldPauseAllDeliveries());
+
+ // Empty the tracker
+ int removed = 0;
+ do {
+ removed = tracker.getScheduledMessages(100).size();
+ } while (removed > 0);
+
+ assertFalse(tracker.shouldPauseAllDeliveries());
+ }
+
+ @Test
+ public void testWithMixedDelays() throws Exception {
+ PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ true);
+
+ assertFalse(tracker.hasMessageAvailable());
+
+ assertTrue(tracker.addMessage(1, 1, 10));
+ assertTrue(tracker.addMessage(2, 2, 20));
+ assertTrue(tracker.addMessage(3, 3, 30));
+ assertTrue(tracker.addMessage(4, 4, 40));
+ assertTrue(tracker.addMessage(5, 5, 50));
+
+ assertFalse(tracker.shouldPauseAllDeliveries());
+
+ for (int i = 6; i <=
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ assertTrue(tracker.addMessage(i, i, i * 10));
+ }
+
+ assertTrue(tracker.shouldPauseAllDeliveries());
+
+ // Add message with earlier delivery time
+ assertTrue(tracker.addMessage(5, 5, 5));
+
+ assertFalse(tracker.shouldPauseAllDeliveries());
+ }
+
+ @Test
+ public void testWithNoDelays() throws Exception {
+ PersistentDispatcherMultipleConsumers dispatcher =
mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ @Cleanup
+ InMemoryDelayedDeliveryTracker tracker = new
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ true);
+
+ assertFalse(tracker.hasMessageAvailable());
+
+ assertTrue(tracker.addMessage(1, 1, 10));
+ assertTrue(tracker.addMessage(2, 2, 20));
+ assertTrue(tracker.addMessage(3, 3, 30));
+ assertTrue(tracker.addMessage(4, 4, 40));
+ assertTrue(tracker.addMessage(5, 5, 50));
+
+ assertFalse(tracker.shouldPauseAllDeliveries());
+
+ for (int i = 6; i <=
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ assertTrue(tracker.addMessage(i, i, i * 10));
+ }
+
+ assertTrue(tracker.shouldPauseAllDeliveries());
+
+ // Add message with no-delay
+ assertFalse(tracker.addMessage(5, 5, -1L));
+
+ assertFalse(tracker.shouldPauseAllDeliveries());
+ }
+
}