This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 84c822a9d01 [fix][broker] Streaming dispatcher stuck after reading the 
first entry with SHARED subscriptions (#17143)
84c822a9d01 is described below

commit 84c822a9d0117e060bdbb8626d6adfed5cb2d80b
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Aug 18 13:26:51 2022 +0200

    [fix][broker] Streaming dispatcher stuck after reading the first entry with 
SHARED subscriptions (#17143)
---
 .../PersistentDispatcherMultipleConsumers.java       |  2 +-
 ...rsistentStreamingDispatcherMultipleConsumers.java | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7ed277ddfda..08257e909a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -114,7 +114,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     "blockedDispatcherOnUnackedMsgs");
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
     private AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
-    private final ExecutorService dispatchMessagesThread;
+    protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
     protected enum ReadType {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 5235c13dc81..1aa0c46e7eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -91,7 +92,24 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
 
         cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
                 .getNextValidPosition((PositionImpl) entry.getPosition()));
-        sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+
+        // dispatch messages to a separate thread, but still in order for this 
subscription
+        // sendMessagesToConsumers is responsible for running broker-side 
filters
+        // that may be quite expensive
+        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+            // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
+            // in a separate thread, and we want to prevent more reads
+            sendInProgress = true;
+            dispatchMessagesThread.execute(safeRun(() -> {
+                if (sendMessagesToConsumers(readType, 
Lists.newArrayList(entry))) {
+                    readMoreEntries();
+                }
+            }));
+        } else {
+            if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
+                readMoreEntriesAsync();
+            }
+        }
         ctx.recycle();
     }
 

Reply via email to