This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4ce0c752cc4 [fix] Key_Shared mode consumption latency when low traffic
(#23340)
4ce0c752cc4 is described below
commit 4ce0c752cc4b2d6dccb818ab0ffa854e82e42b85
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 24 03:50:01 2024 +0800
[fix] Key_Shared mode consumption latency when low traffic (#23340)
Co-authored-by: Lari Hotari <[email protected]>
---
conf/broker.conf | 4 ++--
conf/standalone.conf | 4 ++--
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++--
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 6 ++++--
4 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 125b2aa8c1b..617e202e5ec 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -489,12 +489,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
+dispatcherRetryBackoffInitialTimeInMs=1
# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
+dispatcherRetryBackoffMaxTimeInMs=10
# Precise dispatcher flow control according to history message number of each
entry
preciseDispatcherFlowControl=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 622949bf6c3..535800a43f3 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -305,12 +305,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
+dispatcherRetryBackoffInitialTimeInMs=1
# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
+dispatcherRetryBackoffMaxTimeInMs=10
# Precise dispatcher flow control according to history message number of each
entry
preciseDispatcherFlowControl=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 486587ec174..33b4fbff5f5 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1231,14 +1231,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "On Shared and KeyShared subscriptions, if all available
messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message
dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the initial backoff delay in
milliseconds.")
- private int dispatcherRetryBackoffInitialTimeInMs = 100;
+ private int dispatcherRetryBackoffInitialTimeInMs = 1;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available
messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message
dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the maximum backoff delay in
milliseconds.")
- private int dispatcherRetryBackoffMaxTimeInMs = 1000;
+ private int dispatcherRetryBackoffMaxTimeInMs = 10;
@FieldContext(
dynamic = true,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 450a446c85a..8fdb65e7b30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -729,11 +729,13 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
boolean triggerReadingMore = sendMessagesToConsumers(readType,
entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
updatePendingBytesToDispatch(-totalBytesSize);
+ if (entriesDispatched > 0) {
+ // Reset the backoff when we successfully dispatched messages
+ retryBackoff.reset();
+ }
if (triggerReadingMore) {
if (entriesDispatched > 0 || skipNextBackoff) {
skipNextBackoff = false;
- // Reset the backoff when we successfully dispatched messages
- retryBackoff.reset();
// Call readMoreEntries in the same thread to trigger the next
read
readMoreEntries();
} else if (entriesDispatched == 0) {