This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 6404cc15dd8 [fix] Key_Shared mode consumption latency when low traffic 
(#23340)
6404cc15dd8 is described below

commit 6404cc15dd8ca6e95c2741bf97794c38a2d16d9c
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 24 03:50:01 2024 +0800

    [fix] Key_Shared mode consumption latency when low traffic (#23340)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 4ce0c752cc4b2d6dccb818ab0ffa854e82e42b85)
---
 conf/broker.conf                                                    | 4 ++--
 conf/standalone.conf                                                | 4 ++--
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java    | 4 ++--
 .../service/persistent/PersistentDispatcherMultipleConsumers.java   | 6 ++++--
 4 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 12c97d63068..4d6f54a5e66 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -470,12 +470,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
 # out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
 # delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
+dispatcherRetryBackoffInitialTimeInMs=1
 
 # On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
 # out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
 # delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
+dispatcherRetryBackoffMaxTimeInMs=10
 
 # Precise dispatcher flow control according to history message number of each 
entry
 preciseDispatcherFlowControl=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 98e2c5e9464..10ca74a0b0e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -286,12 +286,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
 # out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
 # delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
+dispatcherRetryBackoffInitialTimeInMs=1
 
 # On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
 # out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
 # delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
+dispatcherRetryBackoffMaxTimeInMs=10
 
 # Precise dispatcher flow control according to history message number of each 
entry
 preciseDispatcherFlowControl=false
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 834de2b77e9..867c2c2461e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1198,14 +1198,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "On Shared and KeyShared subscriptions, if all available 
messages in the subscription are filtered "
                     + "out and not dispatched to any consumer, message 
dispatching will be rescheduled with a backoff "
                     + "delay. This parameter sets the initial backoff delay in 
milliseconds.")
-    private int dispatcherRetryBackoffInitialTimeInMs = 100;
+    private int dispatcherRetryBackoffInitialTimeInMs = 1;
 
     @FieldContext(
             category = CATEGORY_POLICIES,
             doc = "On Shared and KeyShared subscriptions, if all available 
messages in the subscription are filtered "
                     + "out and not dispatched to any consumer, message 
dispatching will be rescheduled with a backoff "
                     + "delay. This parameter sets the maximum backoff delay in 
milliseconds.")
-    private int dispatcherRetryBackoffMaxTimeInMs = 1000;
+    private int dispatcherRetryBackoffMaxTimeInMs = 10;
 
     @FieldContext(
             dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0f8404043c3..3c5d79476b3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -696,10 +696,12 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         boolean triggerReadingMore = sendMessagesToConsumers(readType, 
entries, needAcquireSendInProgress);
         int entriesDispatched = lastNumberOfEntriesDispatched;
         updatePendingBytesToDispatch(-totalBytesSize);
+        if (entriesDispatched > 0) {
+            // Reset the backoff when we successfully dispatched messages
+            retryBackoff.reset();
+        }
         if (triggerReadingMore) {
             if (entriesDispatched > 0) {
-                // Reset the backoff when we successfully dispatched messages
-                retryBackoff.reset();
                 // Call readMoreEntries in the same thread to trigger the next 
read
                 readMoreEntries();
             } else if (entriesDispatched == 0) {

Reply via email to