This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f316469d602438b18cdf859fae158e03b6c59dba
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu May 27 10:20:31 2021 +0900

    [broker] Fix issue where StackOverflowError occurs when trying to redeliver 
a large number of already acked messages (#10696)
    
    The other day, some of our broker servers got the following 
StackOverflowError:
    ```
    13:44:17.410 [pulsar-io-21-6] WARN  o.a.pulsar.broker.service.ServerCnx  - 
[/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null
    java.lang.StackOverflowError: null
            at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
            at 
java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746)
            at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
            at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
            at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
            at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
            at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
            at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
            at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086)
            at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
            at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
    ```
    
    This phenomenon can be reproduced by the following procedure:
    
    1. Store a large number of messages in the backlog of a topic
    2. Connect some Shared consumers to the topic. These consumers receive 
messages but do not acknowledge at all
    3. Run skip-all to remove all messages from the backlog
    4. Add another consumer whose receiver queue size is small
    5. Close all the consumers added in step 2
    6. StackOverflowError occurs on the broker
    
    If broker receives a large number of redelivery requests for messages that 
have already been deleted, 
`PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively 
many times. As a result, we get a StackOverflowError.
    
https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252
    
    - Avoid recursive calls of `readMoreEntries()` on the same thread
    - If the dispatcher receives redelivery requests for messages whose 
positions are earlier than the mark delete position, it does not need to add 
them to `messagesToRedeliver`
    
    (cherry picked from commit 894d92b2be3bee334e7ce32760c4d2e7978603aa)
---
 .../broker/service/AbstractBaseDispatcher.java     |  4 +--
 .../PersistentDispatcherMultipleConsumers.java     | 30 ++++++++++++++--------
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 ++-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 .../broker/service/PersistentTopicE2ETest.java     |  3 +++
 5 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 2813d70..997ce5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -200,8 +200,9 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         return key;
     }
 
-    protected void addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId) {
         // No-op
+        return false;
     }
 
     private void handleTxnCommitMarker(Entry entry) {
@@ -234,5 +235,4 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
             }
         });
     }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 27b5e2a..3f61156 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -209,8 +209,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     log.debug("[{}] Consumer are left, reading more entries", 
name);
                 }
                 consumer.getPendingAcks().forEach((ledgerId, entryId, 
batchSize, none) -> {
-                    messagesToRedeliver.add(ledgerId, entryId);
-                    redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, 
entryId));
+                    if (addMessageToReplay(ledgerId, entryId)) {
+                        
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
+                    }
                 });
                 totalAvailablePermits -= consumer.getAvailablePermits();
                 if (log.isDebugEnabled()) {
@@ -337,7 +338,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never 
called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
                     havePendingReplayRead = false;
-                    readMoreEntries();
+                    // We should not call readMoreEntries() recursively in the 
same thread
+                    // as there is a risk of StackOverflowError
+                    topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
                 log.warn("[{}] Dispatcher read is blocked due to unackMessages 
{} reached to max {}", name,
@@ -560,7 +563,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                         entries.size() - start);
             }
             entries.subList(start, entries.size()).forEach(entry -> {
-                messagesToRedeliver.add(entry.getLedgerId(), 
entry.getEntryId());
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
                 entry.release();
             });
         }
@@ -677,7 +680,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
         consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) 
-> {
-            messagesToRedeliver.add(ledgerId, entryId);
+            addMessageToReplay(ledgerId, entryId);
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer,
@@ -689,8 +692,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List<PositionImpl> positions) {
         positions.forEach(position -> {
-            messagesToRedeliver.add(position.getLedgerId(), 
position.getEntryId());
-            redeliveryTracker.addIfAbsent(position);
+            if (addMessageToReplay(position.getLedgerId(), 
position.getEntryId())) {
+                redeliveryTracker.addIfAbsent(position);
+            }
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
@@ -837,9 +841,15 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
-    @Override
-    public void addMessageToReplay(long ledgerId, long entryId) {
-        this.messagesToRedeliver.add(ledgerId, entryId);
+    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+        PositionImpl markDeletePosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        if (markDeletePosition == null || ledgerId > 
markDeletePosition.getLedgerId()
+                || (ledgerId == markDeletePosition.getLedgerId() && entryId > 
markDeletePosition.getEntryId())) {
+            messagesToRedeliver.add(ledgerId, entryId);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     public PersistentTopic getTopic() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a5c8a5f..2930a33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -598,8 +598,9 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
     }
 
     @Override
-    public void addMessageToReplay(long ledgerId, long entryId) {
+    public boolean addMessageToReplay(long ledgerId, long entryId) {
         this.messagesToRedeliver.add(ledgerId, entryId);
+        return false;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 27bcf97..45921dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -200,7 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 // so we discard for now and mark them for later redelivery
                 for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
                     Entry entry = entriesWithSameKey.get(i);
-                    messagesToRedeliver.add(entry.getLedgerId(), 
entry.getEntryId());
+                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId());
                     entry.release();
                     entriesWithSameKey.set(i, null);
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 2b6e9e2..407593f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1603,6 +1603,9 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         replayMap.set(dispatcher, messagesToReplay);
         // (a) redelivery with all acked-message should clear messageReply 
bucket
         
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            return messagesToReplay.isEmpty();
+        });
         assertEquals(messagesToReplay.size(), 0);
 
         // (b) fill messageReplyBucket with already acked entry again: and try 
to publish new msg and read it

Reply via email to