This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d9598b501b2 [fix][broker] Fix retry backoff for 
PersistentDispatcherMultipleConsumers (#23284)
d9598b501b2 is described below

commit d9598b501b234a986e93a6609c5e62f6d05bd234
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 11 14:31:43 2024 +0300

    [fix][broker] Fix retry backoff for PersistentDispatcherMultipleConsumers 
(#23284)
    
    (cherry picked from commit 8151639dccbbacee5ec23678cf7b5e0a768902dd)
---
 .../PersistentDispatcherMultipleConsumers.java     |  19 +--
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 140 +++++++++++++++++----
 2 files changed, 129 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 23c4cdd84c2..cd5acd069e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -406,12 +406,15 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Reschedule message read in {} ms", 
topic.getName(), name, readAfterMs);
             }
-            topic.getBrokerService().executor().schedule(
-                    () -> {
-                        isRescheduleReadInProgress.set(false);
-                        readMoreEntries();
-                        },
-                    readAfterMs, TimeUnit.MILLISECONDS);
+            Runnable runnable = () -> {
+                isRescheduleReadInProgress.set(false);
+                readMoreEntries();
+            };
+            if (readAfterMs > 0) {
+                topic.getBrokerService().executor().schedule(runnable, 
readAfterMs, TimeUnit.MILLISECONDS);
+            } else {
+                topic.getBrokerService().executor().execute(runnable);
+            }
         }
     }
 
@@ -795,6 +798,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
+        lastNumberOfEntriesDispatched = (int) totalEntries;
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
         if (entriesToDispatch > 0) {
@@ -807,9 +811,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
                 entry.release();
             });
-
-            lastNumberOfEntriesDispatched = entriesToDispatch;
         }
+
         return true;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index f6431b38fb6..f7326734eaa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -98,6 +98,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
     final String topicName = "persistent://public/default/testTopic";
     final String subscriptionName = "testSubscription";
     private AtomicInteger consumerMockAvailablePermits;
+    int retryBackoffInitialTimeInMs = 10;
+    int retryBackoffMaxTimeInMs = 50;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -107,8 +109,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
         
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
         
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
-        
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
-        doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
+        doAnswer(invocation -> 
retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
+        doAnswer(invocation -> 
retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
         pulsarMock = mock(PulsarService.class);
         doReturn(configMock).when(pulsarMock).getConfiguration();
 
@@ -459,34 +461,45 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         allEntries.forEach(entry -> entry.release());
     }
 
-    @DataProvider(name = "dispatchMessagesInSubscriptionThread")
-    private Object[][] dispatchMessagesInSubscriptionThread() {
-        return new Object[][] { { false }, { true } };
+    @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
+    private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
+        return new Object[][] { { false, true }, { true, true }, { true, false 
}, { false, false } };
     }
 
-    @Test(dataProvider = "dispatchMessagesInSubscriptionThread")
-    public void testBackoffDelayWhenNoMessagesDispatched(boolean 
dispatchMessagesInSubscriptionThread)
+    @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+    public void testBackoffDelayWhenNoMessagesDispatched(boolean 
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
             throws Exception {
         persistentDispatcher.close();
 
         List<Long> retryDelays = new CopyOnWriteArrayList<>();
         
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
-        persistentDispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(
-                topicMock, cursorMock, subscriptionMock, configMock,
-                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
-            @Override
-            protected void reScheduleReadInMs(long readAfterMs) {
-                retryDelays.add(readAfterMs);
-            }
-        };
+
+        PersistentDispatcherMultipleConsumers dispatcher;
+        if (isKeyShared) {
+            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+                    topicMock, cursorMock, subscriptionMock, configMock,
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    retryDelays.add(readAfterMs);
+                }
+            };
+        } else {
+            dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, 
cursorMock, subscriptionMock) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    retryDelays.add(readAfterMs);
+                }
+            };
+        }
 
         // add a consumer without permits to trigger the retry behavior
         consumerMockAvailablePermits.set(0);
-        persistentDispatcher.addConsumer(consumerMock);
+        dispatcher.addConsumer(consumerMock);
 
         // call "readEntriesComplete" directly to test the retry behavior
         List<Entry> entries = List.of(EntryImpl.create(1, 1, 
createMessage("message1", 1)));
-        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
         Awaitility.await().untilAsserted(() -> {
                     assertEquals(retryDelays.size(), 1);
                     assertEquals(retryDelays.get(0), 10, "Initial retry delay 
should be 10ms");
@@ -494,7 +507,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
         // test the second retry delay
         entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
-        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
         Awaitility.await().untilAsserted(() -> {
                     assertEquals(retryDelays.size(), 2);
                     double delay = retryDelays.get(1);
@@ -504,7 +517,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         // verify the max retry delay
         for (int i = 0; i < 100; i++) {
             entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
-            persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+            dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
         }
         Awaitility.await().untilAsserted(() -> {
                     assertEquals(retryDelays.size(), 102);
@@ -515,14 +528,14 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         // unblock to check that the retry delay is reset
         consumerMockAvailablePermits.set(1000);
         entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, 
"key2")));
-        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
         // wait that the possibly async handling has completed
-        Awaitility.await().untilAsserted(() -> 
assertFalse(persistentDispatcher.isSendInProgress()));
+        Awaitility.await().untilAsserted(() -> 
assertFalse(dispatcher.isSendInProgress()));
 
         // now block again to check the next retry delay so verify it was reset
         consumerMockAvailablePermits.set(0);
         entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, 
"key3")));
-        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
         Awaitility.await().untilAsserted(() -> {
                     assertEquals(retryDelays.size(), 103);
                     assertEquals(retryDelays.get(0), 10, "Resetted retry delay 
should be 10ms");
@@ -530,6 +543,89 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
     }
 
+    @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+    public void testBackoffDelayWhenRetryDelayDisabled(boolean 
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+            throws Exception {
+        persistentDispatcher.close();
+
+        // it should be possible to disable the retry delay
+        // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs 
to 0
+        retryBackoffInitialTimeInMs=0;
+        retryBackoffMaxTimeInMs=0;
+
+        List<Long> retryDelays = new CopyOnWriteArrayList<>();
+        doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
+                .isDispatcherDispatchMessagesInSubscriptionThread();
+
+        PersistentDispatcherMultipleConsumers dispatcher;
+        if (isKeyShared) {
+            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+                    topicMock, cursorMock, subscriptionMock, configMock,
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    retryDelays.add(readAfterMs);
+                }
+            };
+        } else {
+            dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, 
cursorMock, subscriptionMock) {
+                @Override
+                protected void reScheduleReadInMs(long readAfterMs) {
+                    retryDelays.add(readAfterMs);
+                }
+            };
+        }
+
+        // add a consumer without permits to trigger the retry behavior
+        consumerMockAvailablePermits.set(0);
+        dispatcher.addConsumer(consumerMock);
+
+        // call "readEntriesComplete" directly to test the retry behavior
+        List<Entry> entries = List.of(EntryImpl.create(1, 1, 
createMessage("message1", 1)));
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 1);
+                    assertEquals(retryDelays.get(0), 0, "Initial retry delay 
should be 0ms");
+                }
+        );
+        // test the second retry delay
+        entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 2);
+                    double delay = retryDelays.get(1);
+                    assertEquals(delay, 0, 0, "Second retry delay should be 
0ms");
+                }
+        );
+        // verify the max retry delay
+        for (int i = 0; i < 100; i++) {
+            entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
+            dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        }
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 102);
+                    double delay = retryDelays.get(101);
+                    assertEquals(delay, 0, 0, "Max delay should be 0ms");
+                }
+        );
+        // unblock to check that the retry delay is reset
+        consumerMockAvailablePermits.set(1000);
+        entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, 
"key2")));
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        // wait that the possibly async handling has completed
+        Awaitility.await().untilAsserted(() -> 
assertFalse(dispatcher.isSendInProgress()));
+
+        // now block again to check the next retry delay so verify it was reset
+        consumerMockAvailablePermits.set(0);
+        entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, 
"key3")));
+        dispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 103);
+                    assertEquals(retryDelays.get(0), 0, "Resetted retry delay 
should be 0ms");
+                }
+        );
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }

Reply via email to