This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f702190948 [fix][broker] Do not use IO thread for consumerFlow in
Shared subscription (#16304)
9f702190948 is described below
commit 9f7021909486ac97590b17a6c245367900ab989c
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Jul 13 11:59:19 2022 +0200
[fix][broker] Do not use IO thread for consumerFlow in Shared subscription
(#16304)
---
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6d6e8a72f00..6af58557a83 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -211,7 +211,13 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
@Override
- public synchronized void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
+ public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
+ topic.getBrokerService().executor().execute(() -> {
+ internalConsumerFlow(consumer, additionalNumberOfMessages);
+ });
+ }
+
+ private synchronized void internalConsumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
if (!consumerSet.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control from disconnected
consumer {}", name, consumer);