This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea4252a493 [fix][broker] Avoid introducing delay when there are 
delayed messages or marker messages (#23343)
5ea4252a493 is described below

commit 5ea4252a493c5d93046cfc3aeb1977814bc64a41
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 25 08:28:21 2024 +0300

    [fix][broker] Avoid introducing delay when there are delayed messages or 
marker messages (#23343)
---
 .../PersistentDispatcherMultipleConsumers.java     | 34 ++++++---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 11 +--
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 82 ++++++++++++++++++++++
 3 files changed, 112 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 8fdb65e7b30..73ad2cf0a3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -134,7 +134,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     private AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
-    protected int lastNumberOfEntriesDispatched;
+    // tracks how many entries were processed by consumers in the last 
trySendMessagesToConsumers call
+    // the number includes also delayed messages, marker messages, aborted txn 
messages and filtered messages
+    // When no messages were processed, the value is 0. This is also an 
indication that the dispatcher didn't
+    // make progress in the last trySendMessagesToConsumers call.
+    protected int lastNumberOfEntriesProcessed;
     protected boolean skipNextBackoff;
     private final Backoff retryBackoff;
     protected enum ReadType {
@@ -727,19 +731,22 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                                                                   boolean 
needAcquireSendInProgress,
                                                                   long 
totalBytesSize) {
         boolean triggerReadingMore = sendMessagesToConsumers(readType, 
entries, needAcquireSendInProgress);
-        int entriesDispatched = lastNumberOfEntriesDispatched;
+        int entriesProcessed = lastNumberOfEntriesProcessed;
         updatePendingBytesToDispatch(-totalBytesSize);
-        if (entriesDispatched > 0) {
-            // Reset the backoff when we successfully dispatched messages
+        boolean canReadMoreImmediately = false;
+        if (entriesProcessed > 0 || skipNextBackoff) {
+            // Reset the backoff when messages were processed
             retryBackoff.reset();
+            // Reset the possible flag to skip the backoff delay
+            skipNextBackoff = false;
+            canReadMoreImmediately = true;
         }
         if (triggerReadingMore) {
-            if (entriesDispatched > 0 || skipNextBackoff) {
-                skipNextBackoff = false;
+            if (canReadMoreImmediately) {
                 // Call readMoreEntries in the same thread to trigger the next 
read
                 readMoreEntries();
-            } else if (entriesDispatched == 0) {
-                // If no messages were dispatched, we need to reschedule a new 
read with an increasing backoff delay
+            } else {
+                // reschedule a new read with an increasing backoff delay
                 reScheduleReadWithBackoff();
             }
         }
@@ -779,7 +786,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
-        lastNumberOfEntriesDispatched = 0;
+        lastNumberOfEntriesProcessed = 0;
 
         int entriesToDispatch = entries.size();
         // Trigger read more messages
@@ -809,6 +816,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
+        long totalEntriesProcessed = 0;
         int avgBatchSizePerMsg = remainingMessages > 0 ? 
Math.max(remainingMessages / entries.size(), 1) : 1;
 
         // If the dispatcher is closed, firstAvailableConsumerPermits will be 
0, which skips dispatching the
@@ -820,6 +828,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 log.info("[{}] rewind because no available consumer found from 
total {}", name, consumerList.size());
                 entries.subList(start, entries.size()).forEach(Entry::release);
                 cursor.rewind();
+                lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
                 return false;
             }
 
@@ -863,6 +872,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalEntries += filterEntriesForConsumer(metadataArray, start,
                     entriesForThisConsumer, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor,
                     readType == ReadType.Replay, c);
+            totalEntriesProcessed += entriesForThisConsumer.size();
 
             c.sendMessages(entriesForThisConsumer, batchSizes, 
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
@@ -882,7 +892,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        lastNumberOfEntriesDispatched = (int) totalEntries;
+        lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
         if (entriesToDispatch > 0) {
@@ -917,6 +927,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
+        long totalEntriesProcessed = 0;
         final AtomicInteger numConsumers = new 
AtomicInteger(assignResult.size());
         for (Map.Entry<Consumer, List<EntryAndMetadata>> current : 
assignResult.entrySet()) {
             final Consumer consumer = current.getKey();
@@ -947,6 +958,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
             totalEntries += filterEntriesForConsumer(entryAndMetadataList, 
batchSizes, sendMessageInfo,
                     batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
+            totalEntriesProcessed += entryAndMetadataList.size();
             consumer.sendMessages(entryAndMetadataList, batchSizes, 
batchIndexesAcks,
                     sendMessageInfo.getTotalMessages(), 
sendMessageInfo.getTotalBytes(),
                     sendMessageInfo.getTotalChunkedMessages(), 
getRedeliveryTracker()
@@ -962,7 +974,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        lastNumberOfEntriesDispatched = (int) totalEntries;
+        lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
         return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 26463ba902c..ecd3f19a140 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -190,10 +190,11 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     protected synchronized boolean trySendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
-        lastNumberOfEntriesDispatched = 0;
+        lastNumberOfEntriesProcessed = 0;
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
+        long totalEntriesProcessed = 0;
         int entriesCount = entries.size();
 
         // Trigger read more messages
@@ -233,6 +234,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                         } else if (readType == ReadType.Replay) {
                             entries.forEach(Entry::release);
                         }
+                        skipNextBackoff = true;
                         return true;
                     }
                 }
@@ -298,6 +300,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entriesForConsumer.size());
             totalEntries += filterEntriesForConsumer(entriesForConsumer, 
batchSizes, sendMessageInfo,
                     batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
+            totalEntriesProcessed += entriesForConsumer.size();
             consumer.sendMessages(entriesForConsumer, batchSizes, 
batchIndexesAcks,
                     sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
@@ -368,7 +371,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             }
         }
 
-        lastNumberOfEntriesDispatched = (int) totalEntries;
+        lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
 
         // acquire message-dispatch permits for already delivered messages
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
@@ -387,8 +390,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             return true;
         }
 
-        // if no messages were sent, we should retry after a backoff delay
-        if (entriesByConsumerForDispatching.size() == 0) {
+        // if no messages were sent to consumers, we should retry
+        if (totalEntries == 0) {
             return true;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index b78d1e554c3..dcd852f409d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -46,6 +46,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -996,6 +998,86 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
     }
 
+
+    @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+    public void testNoBackoffDelayWhenDelayedMessages(boolean 
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+            throws Exception {
+        persistentDispatcher.close();
+
+        doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
+                .isDispatcherDispatchMessagesInSubscriptionThread();
+
+        AtomicInteger readMoreEntriesCalled = new AtomicInteger(0);
+        AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0);
+        AtomicBoolean delayAllMessages = new AtomicBoolean(true);
+
+        PersistentDispatcherMultipleConsumers dispatcher;
+        if (isKeyShared) {
+            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+                    topicMock, cursorMock, subscriptionMock, configMock,
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    reScheduleReadInMsCalled.incrementAndGet();
+                }
+
+                @Override
+                public synchronized void readMoreEntries() {
+                    readMoreEntriesCalled.incrementAndGet();
+                }
+
+                @Override
+                public boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
+                    if (delayAllMessages.get()) {
+                        // simulate delayed message
+                        return true;
+                    }
+                    return super.trackDelayedDelivery(ledgerId, entryId, 
msgMetadata);
+                }
+            };
+        } else {
+            dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, 
cursorMock, subscriptionMock) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    reScheduleReadInMsCalled.incrementAndGet();
+                }
+
+                @Override
+                public synchronized void readMoreEntries() {
+                    readMoreEntriesCalled.incrementAndGet();
+                }
+
+                @Override
+                public boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
+                    if (delayAllMessages.get()) {
+                        // simulate delayed message
+                        return true;
+                    }
+                    return super.trackDelayedDelivery(ledgerId, entryId, 
msgMetadata);
+                }
+            };
+        }
+
+        doAnswer(invocationOnMock -> {
+            GenericFutureListener<Future<Void>> listener = 
invocationOnMock.getArgument(0);
+            Future<Void> future = mock(Future.class);
+            when(future.isDone()).thenReturn(true);
+            listener.operationComplete(future);
+            return channelMock;
+        }).when(channelMock).addListener(any());
+
+        // add a consumer with permits
+        consumerMockAvailablePermits.set(1000);
+        dispatcher.addConsumer(consumerMock);
+
+        List<Entry> entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, 
createMessage("message1", 1))));
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(reScheduleReadInMsCalled.get(), 0, 
"reScheduleReadInMs should not be called");
+            assertTrue(readMoreEntriesCalled.get() >= 1);
+        });
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }

Reply via email to