This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a88d9520208 [fix][broker]Prevent `StackOverFlowException` in SHARED
subscription(#16968)
a88d9520208 is described below
commit a88d9520208abc02d506fe4eb0ab2557129bc184
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Aug 9 01:30:46 2022 +0800
[fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
---
.../PersistentDispatcherMultipleConsumers.java | 37 +++++++++++++---------
1 file changed, 22 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..4887f6b0541 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
- protected boolean sendInProgress;
+ protected volatile boolean sendInProgress;
protected static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
readMoreEntries();
}
+ /**
+ * We should not call readMoreEntries() recursively in the same thread as
there is a risk of StackOverflowError.
+ *
+ */
+ public void readMoreEntiresAsync() {
+ topic.getBrokerService().executor().execute(() -> readMoreEntries());
+ }
+
public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// next entries as readCompletedEntries-callback was never
called
if ((messagesToReplayNow.size() - deletedMessages.size()) ==
0) {
havePendingReplayRead = false;
- // We should not call readMoreEntries() recursively in the
same thread
- // as there is a risk of StackOverflowError
- topic.getBrokerService().executor().execute(() ->
readMoreEntries());
+ readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) ==
TRUE) {
if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers
will be executed
// in a separate thread, and we want to prevent more reads
- sendInProgress = true;
- dispatchMessagesThread.execute(safeRun(() ->
sendMessagesToConsumers(readType, entries)));
+ dispatchMessagesThread.execute(safeRun(() -> {
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntries();
+ }
+ }));
} else {
- sendMessagesToConsumers(readType, entries);
+ if (sendMessagesToConsumers(readType, entries)) {
+ readMoreEntiresAsync();
+ }
}
}
- protected final synchronized void sendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
+ protected final synchronized boolean sendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
sendInProgress = true;
- boolean readMoreEntries;
try {
- readMoreEntries = trySendMessagesToConsumers(readType, entries);
+ return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
- if (readMoreEntries) {
- readMoreEntries();
- }
}
/**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since
maxUnackedMessagesPerSubscription=0", name);
- topic.getBrokerService().executor().execute(() ->
readMoreEntries());
+ readMoreEntiresAsync();
}
int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this,
numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
- topic.getBrokerService().executor().execute(() ->
readMoreEntries());
+ readMoreEntiresAsync();
}
}
// increment broker-level count