eolivelli commented on code in PR #16603:
URL: https://github.com/apache/pulsar/pull/16603#discussion_r921893502
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -520,10 +521,19 @@ public synchronized void readEntriesComplete(List<Entry>
entries, Object ctx) {
log.debug("[{}] Distributing {} messages to {} consumers", name,
entries.size(), consumerList.size());
}
- sendMessagesToConsumers(readType, entries);
+ if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+ // dispatch messages to a separate thread, but still in order for
this subscription
+ // sendMessagesToConsumers is responsible for running broker-side
filters
+ // that may be quite expensive
+ topic.getBrokerService().getTopicOrderedExecutor()
+ .executeOrdered(name,
Review Comment:
`do we care about potential (unchecked) RejectedExecutionException here?`
we usually don't deal with RejectedExecutionException in Dispatcher code.
in any case even if the execution is rejected the Dispatcher will read more
entries for some other reasons
`we may need to prevent this code from submitting unlimited amount of
runnables into that executor / handle long backlog.`
This shouldn't be a problem because there is a "natural" backpressure
mechanism here, we trigger this code when new entries are read from storage
(BK/Offloaders) but we request new entries after dispatching them to Consumers.
So usually there is only one of this tasks that is pending for the
subscription.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]