codelipenghui commented on code in PR #16603:
URL: https://github.com/apache/pulsar/pull/16603#discussion_r921110853


##########
conf/broker.conf:
##########
@@ -403,6 +403,9 @@ dispatchThrottlingOnNonBacklogConsumerEnabled=true
 # Max number of entries to read from bookkeeper. By default it is 100 entries.
 dispatcherMaxReadBatchSize=100
 
+# Dispatch messages and execute broker side filters in a per-subscription 
thread
+dispatcherDispatchMessagesInSubscriptionThread=true

Review Comment:
   Should we keep it false by default? Most of the users don't have broker-side 
filters. Enabled by default will introduce more thread switching.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -520,10 +521,19 @@ public synchronized void readEntriesComplete(List<Entry> 
entries, Object ctx) {
             log.debug("[{}] Distributing {} messages to {} consumers", name, 
entries.size(), consumerList.size());
         }
 
-        sendMessagesToConsumers(readType, entries);
+        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+            // dispatch messages to a separate thread, but still in order for 
this subscription
+            // sendMessagesToConsumers is responsible for running broker-side 
filters
+            // that may be quite expensive
+            topic.getBrokerService().getTopicOrderedExecutor()
+                    .executeOrdered(name,
+                            safeRun(() -> sendMessagesToConsumers(readType, 
entries)));

Review Comment:
   We can have a pinned executor instead of selecting the executor for each 
operation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -520,10 +521,19 @@ public synchronized void readEntriesComplete(List<Entry> 
entries, Object ctx) {
             log.debug("[{}] Distributing {} messages to {} consumers", name, 
entries.size(), consumerList.size());
         }
 
-        sendMessagesToConsumers(readType, entries);
+        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+            // dispatch messages to a separate thread, but still in order for 
this subscription
+            // sendMessagesToConsumers is responsible for running broker-side 
filters
+            // that may be quite expensive
+            topic.getBrokerService().getTopicOrderedExecutor()
+                    .executeOrdered(name,
+                            safeRun(() -> sendMessagesToConsumers(readType, 
entries)));
+        } else {
+            sendMessagesToConsumers(readType, entries);
+        }
     }
 
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
+    protected synchronized void sendMessagesToConsumers(ReadType readType, 
List<Entry> entries) {

Review Comment:
   It's better to have a check and only apply the lock if 
`dispatcherDispatchMessagesInSubscriptionThread` is enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to