This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e16a35dbc5b [fix][broker] Streaming dispatcher stuck after reading the
first entry with SHARED subscriptions (#17143)
e16a35dbc5b is described below
commit e16a35dbc5b18c45ab9549b5143d849958b996a7
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Aug 18 13:26:51 2022 +0200
[fix][broker] Streaming dispatcher stuck after reading the first entry with
SHARED subscriptions (#17143)
---
.../PersistentDispatcherMultipleConsumers.java | 2 +-
...rsistentStreamingDispatcherMultipleConsumers.java | 20 +++++++++++++++++++-
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 34f63e392dc..94cee65f912 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -114,7 +114,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
"blockedDispatcherOnUnackedMsgs");
protected Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
private AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
- private final ExecutorService dispatchMessagesThread;
+ protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
protected enum ReadType {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index ed637b46379..3cbc43cfdfe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -91,7 +92,24 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
.getNextValidPosition((PositionImpl) entry.getPosition()));
- sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+
+ // dispatch messages to a separate thread, but still in order for this
subscription
+ // sendMessagesToConsumers is responsible for running broker-side
filters
+ // that may be quite expensive
+ if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+ // setting sendInProgress here, because sendMessagesToConsumers
will be executed
+ // in a separate thread, and we want to prevent more reads
+ sendInProgress = true;
+ dispatchMessagesThread.execute(safeRun(() -> {
+ if (sendMessagesToConsumers(readType,
Lists.newArrayList(entry))) {
+ readMoreEntries();
+ }
+ }));
+ } else {
+ if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
+ readMoreEntriesAsync();
+ }
+ }
ctx.recycle();
}