This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f316469d602438b18cdf859fae158e03b6c59dba Author: Masahiro Sakamoto <[email protected]> AuthorDate: Thu May 27 10:20:31 2021 +0900 [broker] Fix issue where StackOverflowError occurs when trying to redeliver a large number of already acked messages (#10696) The other day, some of our broker servers got the following StackOverflowError: ``` 13:44:17.410 [pulsar-io-21-6] WARN o.a.pulsar.broker.service.ServerCnx - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null java.lang.StackOverflowError: null at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) ``` This phenomenon can be reproduced by the following procedure: 1. Store a large number of messages in the backlog of a topic 2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all 3. Run skip-all to remove all messages from the backlog 4. Add another consumer whose receiver queue size is small 5. Close all the consumers added in step 2 6. StackOverflowError occurs on the broker If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError. https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252 - Avoid recursive calls of `readMoreEntries()` on the same thread - If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver` (cherry picked from commit 894d92b2be3bee334e7ce32760c4d2e7978603aa) --- .../broker/service/AbstractBaseDispatcher.java | 4 +-- .../PersistentDispatcherMultipleConsumers.java | 30 ++++++++++++++-------- .../PersistentDispatcherSingleActiveConsumer.java | 3 ++- ...istentStickyKeyDispatcherMultipleConsumers.java | 2 +- .../broker/service/PersistentTopicE2ETest.java | 3 +++ 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 2813d70..997ce5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -200,8 +200,9 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { return key; } - protected void addMessageToReplay(long ledgerId, long entryId) { + protected boolean addMessageToReplay(long ledgerId, long entryId) { // No-op + return false; } private void handleTxnCommitMarker(Entry entry) { @@ -234,5 +235,4 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { } }); } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 27b5e2a..3f61156 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -209,8 +209,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Consumer are left, reading more entries", name); } consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToRedeliver.add(ledgerId, entryId); - redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); + if (addMessageToReplay(ledgerId, entryId)) { + redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); + } }); totalAvailablePermits -= consumer.getAvailablePermits(); if (log.isDebugEnabled()) { @@ -337,7 +338,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; - readMoreEntries(); + // We should not call readMoreEntries() recursively in the same thread + // as there is a risk of StackOverflowError + topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, @@ -560,7 +563,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); entry.release(); }); } @@ -677,7 +680,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToRedeliver.add(ledgerId, entryId); + addMessageToReplay(ledgerId, entryId); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, @@ -689,8 +692,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) { positions.forEach(position -> { - messagesToRedeliver.add(position.getLedgerId(), position.getEntryId()); - redeliveryTracker.addIfAbsent(position); + if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) { + redeliveryTracker.addIfAbsent(position); + } }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); @@ -837,9 +841,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } } - @Override - public void addMessageToReplay(long ledgerId, long entryId) { - this.messagesToRedeliver.add(ledgerId, entryId); + protected boolean addMessageToReplay(long ledgerId, long entryId) { + PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); + if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() + || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) { + messagesToRedeliver.add(ledgerId, entryId); + return true; + } else { + return false; + } } public PersistentTopic getTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index a5c8a5f..2930a33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -598,8 +598,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp } @Override - public void addMessageToReplay(long ledgerId, long entryId) { + public boolean addMessageToReplay(long ledgerId, long entryId) { this.messagesToRedeliver.add(ledgerId, entryId); + return false; } private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 27bcf97..45921dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -200,7 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // so we discard for now and mark them for later redelivery for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { Entry entry = entriesWithSameKey.get(i); - messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); entry.release(); entriesWithSameKey.set(i, null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 2b6e9e2..407593f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1603,6 +1603,9 @@ public class PersistentTopicE2ETest extends BrokerTestBase { replayMap.set(dispatcher, messagesToReplay); // (a) redelivery with all acked-message should clear messageReply bucket dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + return messagesToReplay.isEmpty(); + }); assertEquals(messagesToReplay.size(), 0); // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
