This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bad716ec3739b7fb2760f6f534306fcb778fc0c6
Author: lipenghui <[email protected]>
AuthorDate: Sun Feb 21 11:12:13 2021 +0800

    Able to handling messages with multiple listener threads in order for the 
Key_Shared subscription. (#9329)
    
    Currently, a consumer is pinged to a given listener thread to ensure the 
message is processed ordered for a topic. But for the Key_Shared subscription, 
the message process order is based on the message key, not the order of the 
topic. So this PR is able to handle messages with multiple listener threads for 
the Key_Shared subscription which also keeps the order of message key.
    
    (cherry picked from commit e4523e09b38514bba87a9fe9842f43d9f7cdce34)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 70 +++++++++++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  7 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 70 +++++++++++++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 66 +++++-------------
 .../client/impl/MultiTopicsConsumerImpl.java       | 79 ++++++++--------------
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  6 +-
 .../impl/PatternMultiTopicsConsumerImpl.java       |  7 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 18 ++---
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  7 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  | 10 +--
 .../pulsar/client/util/ExecutorProvider.java       | 21 ++++++
 .../pulsar/client/impl/ClientTestFixtures.java     |  5 ++
 .../pulsar/client/impl/ConsumerImplTest.java       |  8 +--
 .../client/impl/MultiTopicsConsumerImplTest.java   | 16 ++---
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  3 +-
 16 files changed, 247 insertions(+), 148 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 4146d40..f57d0bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -113,7 +113,7 @@ public class RawReaderImpl implements RawReader {
             super(client,
                 conf.getSingleTopic(),
                 conf,
-                client.externalExecutorProvider().getExecutor(),
+                client.externalExecutorProvider(),
                 TopicName.getPartitionIndex(conf.getSingleTopic()),
                 false,
                 consumerFuture,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 50011d4..e9b30b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcher
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -67,6 +69,14 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "partitioned")
+    public Object[][] partitionedProvider() {
+        return new Object[][] {
+                { false },
+                { true }
+        };
+    }
+
     @DataProvider(name = "data")
     public Object[][] dataProvider() {
         return new Object[][] {
@@ -855,6 +865,66 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
     }
 
+    @Test(dataProvider = "partitioned")
+    public void testOrderingWithConsumerListener(boolean partitioned) throws 
Exception {
+        final String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+        final String subName = "my-sub";
+        final int messages = 1000;
+        List<Message<Integer>> received = Collections.synchronizedList(new 
ArrayList<>(1000));
+        Random random = new Random();
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .listenerThreads(8)
+                .build();
+
+        Consumer<Integer> consumer = client.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(new MessageListener<Integer>() {
+                    @Override
+                    public void received(Consumer<Integer> consumer, 
Message<Integer> msg) {
+                        try {
+                            Thread.sleep(random.nextInt(5));
+                            received.add(msg);
+                        } catch (InterruptedException ignore) {
+                        }
+                    }
+                })
+                .subscribe();
+
+
+        Producer<Integer> producer = client.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        String[] keys = new String[]{"key-1", "key-2", "key-3"};
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().key(keys[i % 3]).value(i).send();
+        }
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(received.size(), messages));
+
+        Map<String, Integer> maxValueOfKeys = new HashMap<>();
+        for (Message<Integer> msg : received) {
+            String key = msg.getKey();
+            Integer value = msg.getValue();
+            if (maxValueOfKeys.containsKey(key)) {
+                Assert.assertTrue(value > maxValueOfKeys.get(key));
+            }
+            maxValueOfKeys.put(key, value);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, 
String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 17da94a..216438b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -20,11 +20,16 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -37,6 +42,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dc4d2c1..30c3d01 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -20,8 +20,9 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -52,7 +53,9 @@ import 
org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -70,7 +73,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
-    protected final ExecutorService listenerExecutor;
+    protected final ExecutorProvider executorProvider;
+    protected final ScheduledExecutorService pinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunckedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
@@ -86,7 +90,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final Lock reentrantLock = new ReentrantLock();
 
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-                           int receiverQueueSize, ExecutorService 
listenerExecutor,
+                           int receiverQueueSize, ExecutorProvider 
executorProvider,
                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema, ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
@@ -99,8 +103,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunckedMessageIdSequenceMap = new 
ConcurrentOpenHashMap<>();
-
-        this.listenerExecutor = listenerExecutor;
+        this.executorProvider = executorProvider;
+        this.pinnedExecutor = (ScheduledExecutorService) 
executorProvider.getExecutor();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
@@ -232,7 +236,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             if (!receivedFuture.complete(message)) {
                 log.warn("Race condition detected. receive future was already 
completed (cancelled={}) and message was dropped. message={}",
                         receivedFuture.isCancelled(), message);
@@ -842,6 +846,58 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
+    protected void triggerListener() {
+        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
+        // thread while the message processing happens
+        Message<T> msg;
+        do {
+            try {
+                msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    final Message<T> finalMsg = msg;
+                    if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
+                        
executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() ->
+                                callMessageListener(finalMsg));
+                    } else {
+                        pinnedExecutor.execute(() -> 
callMessageListener(finalMsg));
+                    }
+                }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
+                return;
+            }
+        } while (msg != null);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
+        }
+    }
+
+    protected void callMessageListener(Message<T> msg) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Calling message listener for message {}", 
topic, subscription,
+                        msg.getMessageId());
+            }
+            listener.received(ConsumerBase.this, msg);
+        } catch (Throwable t) {
+            log.error("[{}][{}] Message listener error in processing message: 
{}", topic, subscription,
+                    msg.getMessageId(), t);
+        }
+    }
+
+    protected static final byte[] NONE_KEY = 
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+    protected byte[] peekMessageKey(Message<T> msg) {
+        byte[] key = NONE_KEY;
+        if (msg.hasKey()) {
+            key = msg.getKeyBytes();
+        }
+        if (msg.hasOrderingKey()) {
+            key = msg.getOrderingKey();
+        }
+        return key;
+    }
+
     protected MessagesImpl<T> getNewMessagesImpl() {
         return new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
                 batchReceivePolicy.getMaxNumBytes());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d4055b0..ef39bfe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,8 +48,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -81,6 +79,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
@@ -195,7 +194,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
-                                               ExecutorService 
listenerExecutor,
+                                               ExecutorProvider 
executorProvider,
                                                int partitionIndex,
                                                boolean hasParentConsumer,
                                                CompletableFuture<Consumer<T>> 
subscribeFuture,
@@ -203,14 +202,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                Schema<T> schema,
                                                ConsumerInterceptors<T> 
interceptors,
                                                boolean 
createTopicIfDoesNotExist) {
-        return newConsumerImpl(client, topic, conf, listenerExecutor, 
partitionIndex, hasParentConsumer, subscribeFuture,
+        return newConsumerImpl(client, topic, conf, executorProvider, 
partitionIndex, hasParentConsumer, subscribeFuture,
                 startMessageId, schema, interceptors, 
createTopicIfDoesNotExist, 0);
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
-                                               ExecutorService 
listenerExecutor,
+                                               ExecutorProvider 
executorProvider,
                                                int partitionIndex,
                                                boolean hasParentConsumer,
                                                CompletableFuture<Consumer<T>> 
subscribeFuture,
@@ -220,23 +219,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                boolean 
createTopicIfDoesNotExist,
                                                long 
startMessageRollbackDurationInSec) {
         if (conf.getReceiverQueueSize() == 0) {
-            return new ZeroQueueConsumerImpl<>(client, topic, conf, 
listenerExecutor, partitionIndex, hasParentConsumer,
+            return new ZeroQueueConsumerImpl<>(client, topic, conf, 
executorProvider, partitionIndex, hasParentConsumer,
                     subscribeFuture,
                     startMessageId, schema, interceptors,
                     createTopicIfDoesNotExist);
         } else {
-            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, 
partitionIndex, hasParentConsumer,
+            return new ConsumerImpl<>(client, topic, conf, executorProvider, 
partitionIndex, hasParentConsumer,
                     subscribeFuture, startMessageId, 
startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
                     schema, interceptors, createTopicIfDoesNotExist);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer,
+            ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer,
             CompletableFuture<Consumer<T>> subscribeFuture, MessageId 
startMessageId,
             long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
             boolean createTopicIfDoesNotExist) {
-        super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema, interceptors);
+        super(client, topic, conf, conf.getReceiverQueueSize(), 
executorProvider, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
         this.startMessageId = startMessageId != null ? new 
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -1095,7 +1094,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private void failPendingReceive() {
         lock.readLock().lock();
         try {
-            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
                 failPendingReceives(this.pendingReceives);
                 failPendingBatchReceives(this.pendingBatchReceives);
             }
@@ -1109,7 +1108,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
@@ -1227,7 +1226,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (listener != null) {
-            triggerListener(numMessages);
+            triggerListener();
         }
     }
 
@@ -1240,7 +1239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && 
expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            ((ScheduledExecutorService) 
listenerExecutor).scheduleAtFixedRate(() -> {
+            pinnedExecutor.scheduleAtFixedRate(() -> {
                 removeExpireIncompleteChunkedMessages();
             }, expireTimeOfIncompleteChunkedMessageMillis, 
expireTimeOfIncompleteChunkedMessageMillis,
                     TimeUnit.MILLISECONDS);
@@ -1317,39 +1316,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return uncompressedPayload;
     }
 
-    protected void triggerListener(int numMessages) {
-        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-        // thread while the message processing happens
-        listenerExecutor.execute(() -> {
-            for (int i = 0; i < numMessages; i++) {
-                try {
-                    Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                    // complete the callback-loop in case queue is cleared up
-                    if (msg == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
-                        }
-                        break;
-                    }
-                    try {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}][{}] Calling message listener for 
message {}", topic, subscription,
-                                    msg.getMessageId());
-                        }
-                        listener.received(ConsumerImpl.this, msg);
-                    } catch (Throwable t) {
-                        log.error("[{}][{}] Message listener error in 
processing message: {}", topic, subscription,
-                                msg.getMessageId(), t);
-                    }
-
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-            }
-        });
-    }
-
     /**
      * Notify waiting asyncReceive request with the received message
      *
@@ -1367,13 +1333,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (exception != null) {
-            listenerExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
+            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received 
message can't be null");
-            listenerExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));
+            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));
             return;
         }
 
@@ -2199,9 +2165,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return;
             }
 
-            ((ScheduledExecutorService) listenerExecutor).schedule(() -> {
+            pinnedExecutor.schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
-                    topic, getHandlerName(), nextDelay);
+                        topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
                 internalGetLastMessageIdAsync(backoff, remainingTime, future);
             }, nextDelay, TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8efab22..b53a342 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
@@ -38,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -59,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,8 +71,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
@@ -108,31 +110,33 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final long startMessageRollbackDurationInSec;
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
-        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, listenerExecutor,
+        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, executorProvider,
                 subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
-                            long startMessageRollbackDurationInSec) {
-        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, listenerExecutor,
-                subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist, startMessageId, startMessageRollbackDurationInSec);
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
+            long startMessageRollbackDurationInSec) {
+        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, executorProvider,
+                subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist, startMessageId,
+                startMessageRollbackDurationInSec);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
-        this(client, singleTopic, conf, listenerExecutor, subscribeFuture, 
schema, interceptors, createTopicIfDoesNotExist, null, 0);
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
+        this(client, singleTopic, conf, executorProvider, subscribeFuture, 
schema, interceptors,
+                createTopicIfDoesNotExist, null, 0);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
-                            long startMessageRollbackDurationInSec) {
-        super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
+            long startMessageRollbackDurationInSec) {
+        super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                 schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
@@ -282,34 +286,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         if (listener != null) {
-            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-            // thread while the message processing happens
-            listenerExecutor.execute(() -> {
-                Message<T> msg;
-                try {
-                    msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                    if (msg == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
-                        }
-                        return;
-                    }
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}][{}] Calling message listener for 
message {}",
-                            topic, subscription, message.getMessageId());
-                    }
-                    listener.received(MultiTopicsConsumerImpl.this, msg);
-                } catch (Throwable t) {
-                    log.error("[{}][{}] Message listener error in processing 
message: {}",
-                        topic, subscription, message, t);
-                }
-            });
+            triggerListener();
         }
     }
 
@@ -597,7 +574,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private void failPendingReceive() {
-        if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
             failPendingReceives(pendingReceives);
             failPendingBatchReceives(pendingBatchReceives);
         }
@@ -856,7 +833,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // first create a consumer with no topic, then do subscription for already 
know partitionedTopic.
     public static <T> MultiTopicsConsumerImpl<T> 
createPartitionedConsumer(PulsarClientImpl client,
                                                                            
ConsumerConfigurationData<T> conf,
-                                                                           
ExecutorService listenerExecutor,
+                                                                           
ExecutorProvider executorProvider,
                                                                            
CompletableFuture<Consumer<T>> subscribeFuture,
                                                                            int 
numPartitions,
                                                                            
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -868,7 +845,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
topicName, cloneConf, listenerExecutor,
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
topicName, cloneConf, executorProvider,
                 future, schema, interceptors, true /* 
createTopicIfDoesNotExist */);
 
         future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
@@ -952,7 +929,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
-                                configurationData, 
client.externalExecutorProvider().getExecutor(),
+                                configurationData, 
client.externalExecutorProvider(),
                                 partitionIndex, true, subFuture,
                                 startMessageId, schema, interceptors,
                                 createIfDoesNotExist, 
startMessageRollbackDurationInSec);
@@ -973,7 +950,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, 
topicName, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), -1, true, 
subFuture, null,
+                    client.externalExecutorProvider(), -1, true, subFuture, 
null,
                     schema, interceptors,
                     createIfDoesNotExist);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -1250,7 +1227,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
-                            client.externalExecutorProvider().getExecutor(),
+                            client.externalExecutorProvider(),
                             partitionIndex, true, subFuture, null, schema, 
interceptors,
                             true /* createTopicIfDoesNotExist */);
                         synchronized (pauseMutex) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 9824b54..06e8245 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -43,13 +42,14 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     private final MultiTopicsConsumerImpl<T> multiTopicsConsumer;
 
     public MultiTopicsReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
-                                 ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
         String subscription = "multiTopicsReader-" + 
DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
         if 
(StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) {
             subscription = readerConfiguration.getSubscriptionRolePrefix() + 
"-" + subscription;
@@ -98,7 +98,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
                             .ranges(readerConfiguration.getKeyHashRanges())
             );
         }
-        multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, 
consumerConfiguration, listenerExecutor, consumerFuture, schema,
+        multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, 
consumerConfiguration, executorProvider, consumerFuture, schema,
                 null, true, readerConfiguration.getStartMessageId(), 
readerConfiguration.getStartMessageFromRollbackDurationInSec());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 91b994a..af161f0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,14 +28,15 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -52,10 +53,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
                                           PulsarClientImpl client,
                                           ConsumerConfigurationData<T> conf,
-                                          ExecutorService listenerExecutor,
+                                          ExecutorProvider executorProvider,
                                           CompletableFuture<Consumer<T>> 
subscribeFuture,
                                           Schema<T> schema, Mode 
subscriptionMode, ConsumerInterceptors<T> interceptors) {
-        super(client, conf, listenerExecutor, subscribeFuture, schema, 
interceptors,
+        super(client, conf, executorProvider, subscribeFuture, schema, 
interceptors,
                 false /* createTopicIfDoesNotExist */);
         this.topicsPattern = topicsPattern;
         this.subscriptionMode = subscriptionMode;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 149c617..43b8acb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -372,14 +372,12 @@ public class PulsarClientImpl implements PulsarClient {
             }
 
             ConsumerBase<T> consumer;
-            // gets the next single threaded executor from the list of 
executors
-            ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             if (metadata.partitions > 0) {
                 consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
-                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
+                        externalExecutorProvider, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
             } else {
                 int partitionIndex = TopicName.getPartitionIndex(topic);
-                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, 
topic, conf, listenerThread, partitionIndex, false,
+                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, 
topic, conf, externalExecutorProvider, partitionIndex, false,
                         consumerSubscribedFuture,null, schema, interceptors,
                         true /* createTopicIfDoesNotExist */);
             }
@@ -398,7 +396,7 @@ public class PulsarClientImpl implements PulsarClient {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
-                externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema, interceptors,
+                externalExecutorProvider, consumerSubscribedFuture, schema, 
interceptors,
                 true /* createTopicIfDoesNotExist */);
 
         consumers.add(consumer);
@@ -431,7 +429,7 @@ public class PulsarClientImpl implements PulsarClient {
                 ConsumerBase<T> consumer = new 
PatternMultiTopicsConsumerImpl<T>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
-                    externalExecutorProvider.getExecutor(),
+                    externalExecutorProvider,
                     consumerSubscribedFuture,
                     schema, subscriptionMode, interceptors);
 
@@ -501,16 +499,14 @@ public class PulsarClientImpl implements PulsarClient {
                 return;
             }
             CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
-            // gets the next single threaded executor from the list of 
executors
-            ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             Reader<T> reader;
             ConsumerBase<T> consumer;
             if (metadata.partitions > 0) {
                 reader = new MultiTopicsReaderImpl<>(PulsarClientImpl.this,
-                        conf, listenerThread, consumerSubscribedFuture, 
schema);
+                        conf, externalExecutorProvider, 
consumerSubscribedFuture, schema);
                 consumer = ((MultiTopicsReaderImpl<T>) 
reader).getMultiTopicsConsumer();
             } else {
-                reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
listenerThread, consumerSubscribedFuture, schema);
+                reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
externalExecutorProvider, consumerSubscribedFuture, schema);
                 consumer = ((ReaderImpl<T>) reader).getConsumer();
             }
 
@@ -641,7 +637,7 @@ public class PulsarClientImpl implements PulsarClient {
         return timer;
     }
 
-    ExecutorProvider externalExecutorProvider() {
+    public ExecutorProvider externalExecutorProvider() {
         return externalExecutorProvider;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 146f5fc..66ff338 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -21,13 +21,14 @@ package org.apache.pulsar.client.impl;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.TopicName;
 
 public class ReaderImpl<T> implements Reader<T> {
@@ -38,7 +39,7 @@ public class ReaderImpl<T> implements Reader<T> {
     private final ConsumerImpl<T> consumer;
 
     public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> 
readerConfiguration,
-                      ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
+                      ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
 
         String subscription = "reader-" + 
DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
         if 
(StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) {
@@ -98,7 +99,7 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration,
-                listenerExecutor, partitionIdx, false, consumerFuture,
+                executorProvider, partitionIdx, false, consumerFuture,
                 readerConfiguration.getStartMessageId(), 
readerConfiguration.getStartMessageFromRollbackDurationInSec(),
                 schema, null, true /* createTopicIfDoesNotExist */);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index ba6e97d..03d1315 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -39,6 +38,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 @Slf4j
 public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
@@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     private volatile boolean waitingOnListenerForZeroQueueSize = false;
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+            ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
             MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors,
             boolean createTopicIfDoesNotExist) {
-        super(client, topic, conf, listenerExecutor, partitionIndex, 
hasParentConsumer, subscribeFuture,
+        super(client, topic, conf, executorProvider, partitionIndex, 
hasParentConsumer, subscribeFuture,
                 startMessageId, 0 /* startMessageRollbackDurationInSec */, 
schema, interceptors,
                 createTopicIfDoesNotExist);
     }
@@ -152,7 +152,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
         checkNotNull(listener, "listener can't be null");
         checkNotNull(message, "unqueued message can't be null");
 
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             stats.updateNumMsgsReceived(message);
             try {
                 if (log.isDebugEnabled()) {
@@ -172,7 +172,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     }
 
     @Override
-    protected void triggerListener(int numMessages) {
+    protected void triggerListener() {
         // Ignore since it was already triggered in the 
triggerZeroQueueSizeListener() call
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 48fca39..6a446d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 @Slf4j
 public class ExecutorProvider {
     private final int numThreads;
     private final List<ExecutorService> executors;
     private final AtomicInteger currentThread = new AtomicInteger(0);
+    private volatile boolean isShutdown;
 
     public ExecutorProvider(int numThreads, ThreadFactory threadFactory) {
         checkArgument(numThreads > 0);
@@ -45,12 +47,26 @@ public class ExecutorProvider {
         for (int i = 0; i < numThreads; i++) {
             
executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory));
         }
+        isShutdown = false;
     }
 
     public ExecutorService getExecutor() {
         return executors.get((currentThread.getAndIncrement() & 
Integer.MAX_VALUE) % numThreads);
     }
 
+    public ExecutorService getExecutor(Object object) {
+        return getExecutorInternal(object == null ? -1 : object.hashCode() & 
Integer.MAX_VALUE);
+    }
+
+    public ExecutorService getExecutor(byte[] bytes) {
+        int keyHash = Murmur3_32Hash.getInstance().makeHash(bytes);
+        return getExecutorInternal(keyHash);
+    }
+
+    private ExecutorService getExecutorInternal(int hash) {
+        return executors.get((hash & Integer.MAX_VALUE) % numThreads);
+    }
+
     public void shutdownNow() {
         executors.forEach(executor -> {
             executor.shutdownNow();
@@ -60,5 +76,10 @@ public class ExecutorProvider {
                 log.warn("Shutdown of thread pool was interrupted");
             }
         });
+        isShutdown = true;
+    }
+
+    public boolean isShutdown() {
+        return isShutdown;
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 085a4a9..8bb7bbc 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoop;
 import io.netty.util.Timer;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.mockito.Mockito;
@@ -81,4 +82,8 @@ class ClientTestFixtures {
     public static ExecutorService createMockedExecutor() {
         return mock(ExecutorService.class);
     }
+
+    public static ExecutorProvider createMockedExecutorProvider() {
+        return mock(ExecutorProvider.class);
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 75d7753..dec7c2a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -28,16 +28,16 @@ import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -45,7 +45,7 @@ import org.testng.annotations.Test;
 public class ConsumerImplTest {
 
 
-    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    private final ExecutorProvider executorProvider = new ExecutorProvider(1, 
new DefaultThreadFactory("ConsumerImplTest"));
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
 
@@ -61,7 +61,7 @@ public class ConsumerImplTest {
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, false, subscribeFuture, null, null, null,
+                executorProvider, -1, false, subscribeFuture, null, null, null,
                 true);
         consumer.setState(HandlerState.State.Ready);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 664e78a..b246a87 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
@@ -36,12 +37,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
 
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
@@ -67,7 +65,7 @@ public class MultiTopicsConsumerImplTest {
 
         ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
         EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
-        ExecutorService listenerExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+        ExecutorProvider executorProvider = new ExecutorProvider(1, 
threadFactory);
 
         PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
 
@@ -78,7 +76,7 @@ public class MultiTopicsConsumerImplTest {
 
         MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
             clientImpl, consumerConfData,
-            listenerExecutor, null, null, null, true);
+            executorProvider, null, null, null, true);
 
         impl.getStats();
     }
@@ -105,7 +103,7 @@ public class MultiTopicsConsumerImplTest {
     }
 
     private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
-        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         int completionDelayMillis = 100;
@@ -115,7 +113,7 @@ public class MultiTopicsConsumerImplTest {
                 new PartitionedTopicMetadata(), completionDelayMillis));
         when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), 
any()))
                 .thenReturn(CompletableFuture.completedFuture(schema));
-        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
             new CompletableFuture<>(), schema, null, true);
         return impl;
     }
@@ -146,7 +144,7 @@ public class MultiTopicsConsumerImplTest {
 
     @Test
     public void testConsumerCleanupOnSubscribeFailure() throws 
InterruptedException, TimeoutException, ExecutionException {
-        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", 
"c")));
@@ -156,7 +154,7 @@ public class MultiTopicsConsumerImplTest {
         
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a 
mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
-        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
                 completeFuture, schema, null, true);
         // assert that we don't start in closed, then we move to closed and 
get an exception
         // indicating that closeAsync was called
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index e4d50db..6e587f7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -39,7 +39,8 @@ public class ReaderImplTest {
         ReaderConfigurationData<byte[]> readerConfiguration = new 
ReaderConfigurationData<>();
         readerConfiguration.setTopicName("topicName");
         CompletableFuture<Consumer<byte[]>> consumerFuture = new 
CompletableFuture<>();
-        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutor(), consumerFuture, Schema.BYTES);
+        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutorProvider(),
+                consumerFuture, Schema.BYTES);
     }
 
     @Test

Reply via email to