lhotari commented on code in PR #22861: URL: https://github.com/apache/pulsar/pull/22861#discussion_r1630111105
########## pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java: ########## @@ -283,6 +283,21 @@ public interface ConsumerBuilder<T> extends Cloneable { */ ConsumerBuilder<T> messageListener(MessageListener<T> messageListener); + /** + * Set the {@link ThreadPoolProvider} to be used for message listeners of <b>current consumer</b>. + * <i>(default: use executor from PulsarClient, + * {@link org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider})</i>. + * + * <p>The listener thread pool is exclusively owned by current consumer + * that are using a "listener" model to get messages. For a given internal consumer, + * the listener will always be invoked from the same thread, to ensure ordering. + * + * <p> The user need to shut down the thread pool after closing the consumer to avoid leaks. + * @param listenerThreadsProvider the threads provider of the consumer message listener + * @return the consumer builder instance + */ + ConsumerBuilder<T> listenerThreadsProvider(ThreadPoolProvider listenerThreadsProvider); Review Comment: It's possible that `java.util.concurrent.Executor` isn't optimal. Pulsar client executes Key_Shared subscriptions on different threads: https://github.com/apache/pulsar/blob/fb80007a47deaadb82d0b1b1e4fcd6ca04c05c9c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L1129-L1136 I guess that's the reason why you have added the `ThreadPoolProvider`. To simplify, I guess one possibility is to have an interface with an execute method that provides the `Message` and the `Runnable`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org