This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a88d9520208 [fix][broker]Prevent `StackOverFlowException` in SHARED 
subscription(#16968)
a88d9520208 is described below

commit a88d9520208abc02d506fe4eb0ab2557129bc184
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Aug 9 01:30:46 2022 +0800

    [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
---
 .../PersistentDispatcherMultipleConsumers.java     | 37 +++++++++++++---------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..4887f6b0541 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-    protected boolean sendInProgress;
+    protected volatile boolean sendInProgress;
     protected static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         readMoreEntries();
     }
 
+    /**
+     * We should not call readMoreEntries() recursively in the same thread as 
there is a risk of StackOverflowError.
+     *
+     */
+    public void readMoreEntiresAsync() {
+        topic.getBrokerService().executor().execute(() -> readMoreEntries());
+    }
+
     public synchronized void readMoreEntries() {
         if (sendInProgress) {
             // we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never 
called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
                     havePendingReplayRead = false;
-                    // We should not call readMoreEntries() recursively in the 
same thread
-                    // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
+                    readMoreEntiresAsync();
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
                 if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
             // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
             // in a separate thread, and we want to prevent more reads
-            sendInProgress = true;
-            dispatchMessagesThread.execute(safeRun(() -> 
sendMessagesToConsumers(readType, entries)));
+            dispatchMessagesThread.execute(safeRun(() -> {
+                if (sendMessagesToConsumers(readType, entries)) {
+                    readMoreEntries();
+                }
+            }));
         } else {
-            sendMessagesToConsumers(readType, entries);
+            if (sendMessagesToConsumers(readType, entries)) {
+                readMoreEntiresAsync();
+            }
         }
     }
 
-    protected final synchronized void sendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
+    protected final synchronized boolean sendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
         sendInProgress = true;
-        boolean readMoreEntries;
         try {
-            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+            return trySendMessagesToConsumers(readType, entries);
         } finally {
             sendInProgress = false;
         }
-        if (readMoreEntries) {
-            readMoreEntries();
-        }
     }
 
     /**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
                 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 
TRUE, FALSE)) {
             log.info("[{}] Dispatcher is unblocked, since 
maxUnackedMessagesPerSubscription=0", name);
-            topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
+            readMoreEntiresAsync();
         }
 
         int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, 
numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 
TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
+                readMoreEntiresAsync();
             }
         }
         // increment broker-level count

Reply via email to