merlimat commented on a change in pull request #9329:
URL: https://github.com/apache/pulsar/pull/9329#discussion_r578145716
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
##########
@@ -45,12 +46,21 @@ public ExecutorProvider(int numThreads, ThreadFactory
threadFactory) {
for (int i = 0; i < numThreads; i++) {
executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory));
}
+ isShutdown = false;
}
public ExecutorService getExecutor() {
return executors.get((currentThread.getAndIncrement() &
Integer.MAX_VALUE) % numThreads);
}
+ public ExecutorService getExecutor(Object object) {
+ return getExecutor(object == null ? -1 : object.hashCode() &
Integer.MAX_VALUE);
+ }
+
+ public ExecutorService getExecutor(int hash) {
Review comment:
In general, from API perspective, it would be better to just expose an
Object (or String) for selecting the thread. Couldn't we calculate the hash
internally always?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -69,7 +70,8 @@
protected final CompletableFuture<Consumer<T>> subscribeFuture;
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
- protected final ExecutorService listenerExecutor;
+ protected final ExecutorProvider executorProvider;
+ protected final ScheduledExecutorService pingedExecutor;
Review comment:
It's not clear to me what `pingedExecutor` means here. Does that mean
"pinned"?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]