This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea4252a493 [fix][broker] Avoid introducing delay when there are
delayed messages or marker messages (#23343)
5ea4252a493 is described below
commit 5ea4252a493c5d93046cfc3aeb1977814bc64a41
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 25 08:28:21 2024 +0300
[fix][broker] Avoid introducing delay when there are delayed messages or
marker messages (#23343)
---
.../PersistentDispatcherMultipleConsumers.java | 34 ++++++---
...istentStickyKeyDispatcherMultipleConsumers.java | 11 +--
...ntStickyKeyDispatcherMultipleConsumersTest.java | 82 ++++++++++++++++++++++
3 files changed, 112 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 8fdb65e7b30..73ad2cf0a3d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -134,7 +134,11 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
- protected int lastNumberOfEntriesDispatched;
+ // tracks how many entries were processed by consumers in the last
trySendMessagesToConsumers call
+ // the number includes also delayed messages, marker messages, aborted txn
messages and filtered messages
+ // When no messages were processed, the value is 0. This is also an
indication that the dispatcher didn't
+ // make progress in the last trySendMessagesToConsumers call.
+ protected int lastNumberOfEntriesProcessed;
protected boolean skipNextBackoff;
private final Backoff retryBackoff;
protected enum ReadType {
@@ -727,19 +731,22 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
boolean
needAcquireSendInProgress,
long
totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType,
entries, needAcquireSendInProgress);
- int entriesDispatched = lastNumberOfEntriesDispatched;
+ int entriesProcessed = lastNumberOfEntriesProcessed;
updatePendingBytesToDispatch(-totalBytesSize);
- if (entriesDispatched > 0) {
- // Reset the backoff when we successfully dispatched messages
+ boolean canReadMoreImmediately = false;
+ if (entriesProcessed > 0 || skipNextBackoff) {
+ // Reset the backoff when messages were processed
retryBackoff.reset();
+ // Reset the possible flag to skip the backoff delay
+ skipNextBackoff = false;
+ canReadMoreImmediately = true;
}
if (triggerReadingMore) {
- if (entriesDispatched > 0 || skipNextBackoff) {
- skipNextBackoff = false;
+ if (canReadMoreImmediately) {
// Call readMoreEntries in the same thread to trigger the next
read
readMoreEntries();
- } else if (entriesDispatched == 0) {
- // If no messages were dispatched, we need to reschedule a new
read with an increasing backoff delay
+ } else {
+ // reschedule a new read with an increasing backoff delay
reScheduleReadWithBackoff();
}
}
@@ -779,7 +786,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
- lastNumberOfEntriesDispatched = 0;
+ lastNumberOfEntriesProcessed = 0;
int entriesToDispatch = entries.size();
// Trigger read more messages
@@ -809,6 +816,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
+ long totalEntriesProcessed = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ?
Math.max(remainingMessages / entries.size(), 1) : 1;
// If the dispatcher is closed, firstAvailableConsumerPermits will be
0, which skips dispatching the
@@ -820,6 +828,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
log.info("[{}] rewind because no available consumer found from
total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
+ lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
return false;
}
@@ -863,6 +872,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalEntries += filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor,
readType == ReadType.Replay, c);
+ totalEntriesProcessed += entriesForThisConsumer.size();
c.sendMessages(entriesForThisConsumer, batchSizes,
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
@@ -882,7 +892,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- lastNumberOfEntriesDispatched = (int) totalEntries;
+ lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
@@ -917,6 +927,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
+ long totalEntriesProcessed = 0;
final AtomicInteger numConsumers = new
AtomicInteger(assignResult.size());
for (Map.Entry<Consumer, List<EntryAndMetadata>> current :
assignResult.entrySet()) {
final Consumer consumer = current.getKey();
@@ -947,6 +958,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalEntries += filterEntriesForConsumer(entryAndMetadataList,
batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay,
consumer);
+ totalEntriesProcessed += entryAndMetadataList.size();
consumer.sendMessages(entryAndMetadataList, batchSizes,
batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()
@@ -962,7 +974,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- lastNumberOfEntriesDispatched = (int) totalEntries;
+ lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 26463ba902c..ecd3f19a140 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -190,10 +190,11 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
- lastNumberOfEntriesDispatched = 0;
+ lastNumberOfEntriesProcessed = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
+ long totalEntriesProcessed = 0;
int entriesCount = entries.size();
// Trigger read more messages
@@ -233,6 +234,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
+ skipNextBackoff = true;
return true;
}
}
@@ -298,6 +300,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entriesForConsumer.size());
totalEntries += filterEntriesForConsumer(entriesForConsumer,
batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay,
consumer);
+ totalEntriesProcessed += entriesForConsumer.size();
consumer.sendMessages(entriesForConsumer, batchSizes,
batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
@@ -368,7 +371,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
}
- lastNumberOfEntriesDispatched = (int) totalEntries;
+ lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
@@ -387,8 +390,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return true;
}
- // if no messages were sent, we should retry after a backoff delay
- if (entriesByConsumerForDispatching.size() == 0) {
+ // if no messages were sent to consumers, we should retry
+ if (totalEntries == 0) {
return true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index b78d1e554c3..dcd852f409d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -46,6 +46,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -996,6 +998,86 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
);
}
+
+ @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+ public void testNoBackoffDelayWhenDelayedMessages(boolean
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+ throws Exception {
+ persistentDispatcher.close();
+
+ doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
+ .isDispatcherDispatchMessagesInSubscriptionThread();
+
+ AtomicInteger readMoreEntriesCalled = new AtomicInteger(0);
+ AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0);
+ AtomicBoolean delayAllMessages = new AtomicBoolean(true);
+
+ PersistentDispatcherMultipleConsumers dispatcher;
+ if (isKeyShared) {
+ dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ reScheduleReadInMsCalled.incrementAndGet();
+ }
+
+ @Override
+ public synchronized void readMoreEntries() {
+ readMoreEntriesCalled.incrementAndGet();
+ }
+
+ @Override
+ public boolean trackDelayedDelivery(long ledgerId, long
entryId, MessageMetadata msgMetadata) {
+ if (delayAllMessages.get()) {
+ // simulate delayed message
+ return true;
+ }
+ return super.trackDelayedDelivery(ledgerId, entryId,
msgMetadata);
+ }
+ };
+ } else {
+ dispatcher = new PersistentDispatcherMultipleConsumers(topicMock,
cursorMock, subscriptionMock) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ reScheduleReadInMsCalled.incrementAndGet();
+ }
+
+ @Override
+ public synchronized void readMoreEntries() {
+ readMoreEntriesCalled.incrementAndGet();
+ }
+
+ @Override
+ public boolean trackDelayedDelivery(long ledgerId, long
entryId, MessageMetadata msgMetadata) {
+ if (delayAllMessages.get()) {
+ // simulate delayed message
+ return true;
+ }
+ return super.trackDelayedDelivery(ledgerId, entryId,
msgMetadata);
+ }
+ };
+ }
+
+ doAnswer(invocationOnMock -> {
+ GenericFutureListener<Future<Void>> listener =
invocationOnMock.getArgument(0);
+ Future<Void> future = mock(Future.class);
+ when(future.isDone()).thenReturn(true);
+ listener.operationComplete(future);
+ return channelMock;
+ }).when(channelMock).addListener(any());
+
+ // add a consumer with permits
+ consumerMockAvailablePermits.set(1000);
+ dispatcher.addConsumer(consumerMock);
+
+ List<Entry> entries = new ArrayList<>(List.of(EntryImpl.create(1, 1,
createMessage("message1", 1))));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(reScheduleReadInMsCalled.get(), 0,
"reScheduleReadInMs should not be called");
+ assertTrue(readMoreEntriesCalled.get() >= 1);
+ });
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}