AuroraTwinkle commented on code in PR #22861:
URL: https://github.com/apache/pulsar/pull/22861#discussion_r1634455525


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java:
##########
@@ -283,6 +283,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
 
+    /**
+     * Set the {@link ThreadPoolProvider} to be used for message listeners of 
<b>current consumer</b>.
+     * <i>(default: use executor from PulsarClient,
+     * {@link 
org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider})</i>.
+     *
+     * <p>The listener thread pool is exclusively owned by current consumer
+     * that are using a "listener" model to get messages. For a given internal 
consumer,
+     * the listener will always be invoked from the same thread, to ensure 
ordering.
+     *
+     * <p> The user need to shut down the thread pool after closing the 
consumer to avoid leaks.
+     * @param listenerThreadsProvider the threads provider of the consumer 
message listener
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> listenerThreadsProvider(ThreadPoolProvider 
listenerThreadsProvider);

Review Comment:
   > It's possible that `java.util.concurrent.Executor` isn't optimal.
   > 
   > Pulsar client executes Key_Shared subscriptions on different threads:
   > 
   > 
https://github.com/apache/pulsar/blob/fb80007a47deaadb82d0b1b1e4fcd6ca04c05c9c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L1129-L1136
   > 
   > I guess that's the reason why you have added the `ThreadPoolProvider`. To 
simplify, I guess one possibility is to have an interface with an execute 
method that provides the `Message` and the `Runnable`.
   
   yeah, in the current Pulsar client versions, we needs to ensure that each 
consumer's message listener is executed in the same thread to avoid lock 
contention and ensure message order , so it cannot simply use 
java.util.concurrent.Executor. And, I will simplify the interface later. Thanks 
for your idea!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to