This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4523e0  Able to handling messages with multiple listener threads in 
order for the Key_Shared subscription. (#9329)
e4523e0 is described below

commit e4523e09b38514bba87a9fe9842f43d9f7cdce34
Author: lipenghui <[email protected]>
AuthorDate: Sun Feb 21 11:12:13 2021 +0800

    Able to handling messages with multiple listener threads in order for the 
Key_Shared subscription. (#9329)
    
    ### Motivation
    
    Currently, a consumer is pinged to a given listener thread to ensure the 
message is processed ordered for a topic. But for the Key_Shared subscription, 
the message process order is based on the message key, not the order of the 
topic. So this PR is able to handle messages with multiple listener threads for 
the Key_Shared subscription which also keeps the order of message key.
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 70 ++++++++++++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  5 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    | 70 +++++++++++++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 73 ++++++---------------
 .../client/impl/MultiTopicsConsumerImpl.java       | 75 ++++++++--------------
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  6 +-
 .../impl/PatternMultiTopicsConsumerImpl.java       |  7 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 18 ++----
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  7 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  | 16 ++---
 .../pulsar/client/util/ExecutorProvider.java       | 21 ++++++
 .../pulsar/client/impl/ClientTestFixtures.java     |  5 ++
 .../pulsar/client/impl/ConsumerImplTest.java       |  8 +--
 .../client/impl/MultiTopicsConsumerImplTest.java   | 16 ++---
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  3 +-
 16 files changed, 247 insertions(+), 155 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 01f8fc8..3f590dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -114,7 +114,7 @@ public class RawReaderImpl implements RawReader {
             super(client,
                     conf.getSingleTopic(),
                     conf,
-                    client.externalExecutorProvider().getExecutor(),
+                    client.externalExecutorProvider(),
                     TopicName.getPartitionIndex(conf.getSingleTopic()),
                     false,
                     consumerFuture,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b5e032f..7084bb7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,6 +30,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -71,6 +73,14 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "partitioned")
+    public Object[][] partitionedProvider() {
+        return new Object[][] {
+                { false },
+                { true }
+        };
+    }
+
     @DataProvider(name = "data")
     public Object[][] dataProvider() {
         return new Object[][] {
@@ -859,6 +869,66 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
     }
 
+    @Test(dataProvider = "partitioned")
+    public void testOrderingWithConsumerListener(boolean partitioned) throws 
Exception {
+        final String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+        final String subName = "my-sub";
+        final int messages = 1000;
+        List<Message<Integer>> received = Collections.synchronizedList(new 
ArrayList<>(1000));
+        Random random = new Random();
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .listenerThreads(8)
+                .build();
+
+        Consumer<Integer> consumer = client.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .messageListener(new MessageListener<Integer>() {
+                    @Override
+                    public void received(Consumer<Integer> consumer, 
Message<Integer> msg) {
+                        try {
+                            Thread.sleep(random.nextInt(5));
+                            received.add(msg);
+                        } catch (InterruptedException ignore) {
+                        }
+                    }
+                })
+                .subscribe();
+
+
+        Producer<Integer> producer = client.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        String[] keys = new String[]{"key-1", "key-2", "key-3"};
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().key(keys[i % 3]).value(i).send();
+        }
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(received.size(), messages));
+
+        Map<String, Integer> maxValueOfKeys = new HashMap<>();
+        for (Message<Integer> msg : received) {
+            String key = msg.getKey();
+            Integer value = msg.getValue();
+            if (maxValueOfKeys.containsKey(key)) {
+                Assert.assertTrue(value > maxValueOfKeys.get(key));
+            }
+            maxValueOfKeys.put(key, value);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
     @Test
     public void testKeySharedConsumerWithEncrypted() throws Exception {
         final String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index ecba1ce..8da4457 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -43,6 +44,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -518,7 +520,8 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         when(mockClient.eventLoopGroup()).thenReturn(new NioEventLoopGroup());
         try {
             ConsumerImpl consumer = new ConsumerImpl(mockClient, "my-topic", 
consumerConfig,
-            Executors.newSingleThreadExecutor(), 0, false, new 
CompletableFuture<>(),
+                    new ExecutorProvider(1, new DefaultThreadFactory("test")), 
0,
+                    false, new CompletableFuture<>(),
             MessageId.earliest, 100, Schema.BYTES,
                     new ConsumerInterceptors(Collections.emptyList()), false);
         } catch (Exception exception) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9ab54bb..0993f0d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -20,19 +20,18 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -53,9 +52,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -69,7 +70,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
-    protected final ExecutorService listenerExecutor;
+    protected final ExecutorProvider executorProvider;
+    protected final ScheduledExecutorService pinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
@@ -85,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final Lock reentrantLock = new ReentrantLock();
 
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-                           int receiverQueueSize, ExecutorService 
listenerExecutor,
+                           int receiverQueueSize, ExecutorProvider 
executorProvider,
                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema, ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
@@ -98,8 +100,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunkedMessageIdSequenceMap = new 
ConcurrentOpenHashMap<>();
-
-        this.listenerExecutor = listenerExecutor;
+        this.executorProvider = executorProvider;
+        this.pinnedExecutor = (ScheduledExecutorService) 
executorProvider.getExecutor();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
@@ -231,7 +233,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             if (!receivedFuture.complete(message)) {
                 log.warn("Race condition detected. receive future was already 
completed (cancelled={}) and message was dropped. message={}",
                         receivedFuture.isCancelled(), message);
@@ -842,6 +844,58 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
+    protected void triggerListener() {
+        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
+        // thread while the message processing happens
+        Message<T> msg;
+        do {
+            try {
+                msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    final Message<T> finalMsg = msg;
+                    if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
+                        
executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() ->
+                                callMessageListener(finalMsg));
+                    } else {
+                        pinnedExecutor.execute(() -> 
callMessageListener(finalMsg));
+                    }
+                }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
+                return;
+            }
+        } while (msg != null);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
+        }
+    }
+
+    protected void callMessageListener(Message<T> msg) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Calling message listener for message {}", 
topic, subscription,
+                        msg.getMessageId());
+            }
+            listener.received(ConsumerBase.this, msg);
+        } catch (Throwable t) {
+            log.error("[{}][{}] Message listener error in processing message: 
{}", topic, subscription,
+                    msg.getMessageId(), t);
+        }
+    }
+
+    protected static final byte[] NONE_KEY = 
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+    protected byte[] peekMessageKey(Message<T> msg) {
+        byte[] key = NONE_KEY;
+        if (msg.hasKey()) {
+            key = msg.getKeyBytes();
+        }
+        if (msg.hasOrderingKey()) {
+            key = msg.getOrderingKey();
+        }
+        return key;
+    }
+
     protected MessagesImpl<T> getNewMessagesImpl() {
         return new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
                 batchReceivePolicy.getMaxNumBytes());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b5c81da..724dc44 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -47,8 +47,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -57,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -77,6 +76,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -188,7 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
-                                               ExecutorService 
listenerExecutor,
+                                               ExecutorProvider 
executorProvider,
                                                int partitionIndex,
                                                boolean hasParentConsumer,
                                                CompletableFuture<Consumer<T>> 
subscribeFuture,
@@ -197,14 +197,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                ConsumerInterceptors<T> 
interceptors,
                                                boolean 
createTopicIfDoesNotExist)
             throws PulsarClientException.InvalidConfigurationException {
-        return newConsumerImpl(client, topic, conf, listenerExecutor, 
partitionIndex, hasParentConsumer, subscribeFuture,
+        return newConsumerImpl(client, topic, conf, executorProvider, 
partitionIndex, hasParentConsumer, subscribeFuture,
                 startMessageId, schema, interceptors, 
createTopicIfDoesNotExist, 0);
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
-                                               ExecutorService 
listenerExecutor,
+                                               ExecutorProvider 
executorProvider,
                                                int partitionIndex,
                                                boolean hasParentConsumer,
                                                CompletableFuture<Consumer<T>> 
subscribeFuture,
@@ -215,23 +215,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                long 
startMessageRollbackDurationInSec)
             throws PulsarClientException.InvalidConfigurationException {
         if (conf.getReceiverQueueSize() == 0) {
-            return new ZeroQueueConsumerImpl<>(client, topic, conf, 
listenerExecutor, partitionIndex, hasParentConsumer,
+            return new ZeroQueueConsumerImpl<>(client, topic, conf, 
executorProvider, partitionIndex, hasParentConsumer,
                     subscribeFuture,
                     startMessageId, schema, interceptors,
                     createTopicIfDoesNotExist);
         } else {
-            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, 
partitionIndex, hasParentConsumer,
+            return new ConsumerImpl<>(client, topic, conf, executorProvider, 
partitionIndex, hasParentConsumer,
                     subscribeFuture, startMessageId, 
startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
                     schema, interceptors, createTopicIfDoesNotExist);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer,
-            CompletableFuture<Consumer<T>> subscribeFuture, MessageId 
startMessageId,
-            long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
-            boolean createTopicIfDoesNotExist) throws 
PulsarClientException.InvalidConfigurationException {
-        super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema, interceptors);
+           ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer,
+           CompletableFuture<Consumer<T>> subscribeFuture, MessageId 
startMessageId,
+           long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
+           boolean createTopicIfDoesNotExist) throws 
PulsarClientException.InvalidConfigurationException {
+        super(client, topic, conf, conf.getReceiverQueueSize(), 
executorProvider, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
         this.startMessageId = startMessageId != null ? new 
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -947,7 +947,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private void failPendingReceive() {
         lock.readLock().lock();
         try {
-            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
                 failPendingReceives(this.pendingReceives);
                 failPendingBatchReceives(this.pendingBatchReceives);
             }
@@ -961,7 +961,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
@@ -1077,7 +1077,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (listener != null) {
-            triggerListener(numMessages);
+            triggerListener();
         }
     }
 
@@ -1090,7 +1090,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && 
expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            ((ScheduledExecutorService) 
listenerExecutor).scheduleAtFixedRate(() -> {
+            pinnedExecutor.scheduleAtFixedRate(() -> {
                 removeExpireIncompleteChunkedMessages();
             }, expireTimeOfIncompleteChunkedMessageMillis, 
expireTimeOfIncompleteChunkedMessageMillis,
                     TimeUnit.MILLISECONDS);
@@ -1167,39 +1167,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return uncompressedPayload;
     }
 
-    protected void triggerListener(int numMessages) {
-        // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-        // thread while the message processing happens
-        listenerExecutor.execute(() -> {
-            for (int i = 0; i < numMessages; i++) {
-                try {
-                    Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                    // complete the callback-loop in case queue is cleared up
-                    if (msg == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
-                        }
-                        break;
-                    }
-                    try {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}][{}] Calling message listener for 
message {}", topic, subscription,
-                                    msg.getMessageId());
-                        }
-                        listener.received(ConsumerImpl.this, msg);
-                    } catch (Throwable t) {
-                        log.error("[{}][{}] Message listener error in 
processing message: {}", topic, subscription,
-                                msg.getMessageId(), t);
-                    }
-
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-            }
-        });
-    }
-
     /**
      * Notify waiting asyncReceive request with the received message
      *
@@ -1217,13 +1184,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (exception != null) {
-            listenerExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
+            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received 
message can't be null");
-            listenerExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));
+            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));
             return;
         }
 
@@ -2082,9 +2049,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return;
             }
 
-            ((ScheduledExecutorService) listenerExecutor).schedule(() -> {
+            pinnedExecutor.schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
-                    topic, getHandlerName(), nextDelay);
+                        topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
                 internalGetLastMessageIdAsync(backoff, remainingTime, future);
             }, nextDelay, TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2c81d0a..c806c7f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -44,7 +44,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -66,6 +65,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -109,31 +109,33 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final long startMessageRollbackDurationInSec;
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
-        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, listenerExecutor,
+        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, executorProvider,
                 subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
-                            long startMessageRollbackDurationInSec) {
-        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, listenerExecutor,
-                subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist, startMessageId, startMessageRollbackDurationInSec);
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
+            long startMessageRollbackDurationInSec) {
+        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, executorProvider,
+                subscribeFuture, schema, interceptors, 
createTopicIfDoesNotExist, startMessageId,
+                startMessageRollbackDurationInSec);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
-        this(client, singleTopic, conf, listenerExecutor, subscribeFuture, 
schema, interceptors, createTopicIfDoesNotExist, null, 0);
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
+        this(client, singleTopic, conf, executorProvider, subscribeFuture, 
schema, interceptors,
+                createTopicIfDoesNotExist, null, 0);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, 
ConsumerConfigurationData<T> conf,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-                            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
-                            long startMessageRollbackDurationInSec) {
-        super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
+            ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist, MessageId startMessageId,
+            long startMessageRollbackDurationInSec) {
+        super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                 schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
@@ -283,34 +285,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         if (listener != null) {
-            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-            // thread while the message processing happens
-            listenerExecutor.execute(() -> {
-                Message<T> msg;
-                try {
-                    msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                    if (msg == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
-                        }
-                        return;
-                    }
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}][{}] Calling message listener for 
message {}",
-                            topic, subscription, message.getMessageId());
-                    }
-                    listener.received(MultiTopicsConsumerImpl.this, msg);
-                } catch (Throwable t) {
-                    log.error("[{}][{}] Message listener error in processing 
message: {}",
-                        topic, subscription, message, t);
-                }
-            });
+            triggerListener();
         }
     }
 
@@ -598,7 +573,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private void failPendingReceive() {
-        if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
             failPendingReceives(pendingReceives);
             failPendingBatchReceives(pendingBatchReceives);
         }
@@ -857,7 +832,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // first create a consumer with no topic, then do subscription for already 
know partitionedTopic.
     public static <T> MultiTopicsConsumerImpl<T> 
createPartitionedConsumer(PulsarClientImpl client,
                                                                            
ConsumerConfigurationData<T> conf,
-                                                                           
ExecutorService listenerExecutor,
+                                                                           
ExecutorProvider executorProvider,
                                                                            
CompletableFuture<Consumer<T>> subscribeFuture,
                                                                            int 
numPartitions,
                                                                            
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -869,7 +844,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
topicName, cloneConf, listenerExecutor,
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
topicName, cloneConf, executorProvider,
                 future, schema, interceptors, true /* 
createTopicIfDoesNotExist */);
 
         future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
@@ -955,7 +930,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = null;
                         try {
                             newConsumer = ConsumerImpl.newConsumerImpl(client, 
partitionName,
-                                    configurationData, 
client.externalExecutorProvider().getExecutor(),
+                                    configurationData, 
client.externalExecutorProvider(),
                                     partitionIndex, true, subFuture,
                                     startMessageId, schema, interceptors,
                                     createIfDoesNotExist, 
startMessageRollbackDurationInSec);
@@ -984,8 +959,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             ConsumerImpl<T> newConsumer = null;
             try {
                 newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, 
internalConfig,
-                        client.externalExecutorProvider().getExecutor(), -1, 
true, subFuture, null,
-                        schema, interceptors,
+                        client.externalExecutorProvider(), -1,
+                        true, subFuture, null, schema, interceptors,
                         createIfDoesNotExist);
                 consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
@@ -1270,7 +1245,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         try {
                             newConsumer = ConsumerImpl.newConsumerImpl(
                                 client, partitionName, configurationData,
-                                
client.externalExecutorProvider().getExecutor(),
+                                client.externalExecutorProvider(),
                                 partitionIndex, true, subFuture, null, schema, 
interceptors,
                                 true /* createTopicIfDoesNotExist */);
                         } catch 
(PulsarClientException.InvalidConfigurationException e) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index eac72aa..f2e9f8a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -22,7 +22,6 @@ package org.apache.pulsar.client.impl;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -40,13 +39,14 @@ import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     private final MultiTopicsConsumerImpl<T> multiTopicsConsumer;
 
     public MultiTopicsReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
-                                 ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) {
         String subscription = "multiTopicsReader-" + 
DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
         if 
(StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) {
             subscription = readerConfiguration.getSubscriptionRolePrefix() + 
"-" + subscription;
@@ -98,7 +98,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
                             .ranges(readerConfiguration.getKeyHashRanges())
             );
         }
-        multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, 
consumerConfiguration, listenerExecutor, consumerFuture, schema,
+        multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, 
consumerConfiguration, executorProvider, consumerFuture, schema,
                 null, true, readerConfiguration.getStartMessageId(), 
readerConfiguration.getStartMessageFromRollbackDurationInSec());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index db8bd81..7769d70 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,13 +28,14 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -52,10 +53,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
                                           PulsarClientImpl client,
                                           ConsumerConfigurationData<T> conf,
-                                          ExecutorService listenerExecutor,
+                                          ExecutorProvider executorProvider,
                                           CompletableFuture<Consumer<T>> 
subscribeFuture,
                                           Schema<T> schema, Mode 
subscriptionMode, ConsumerInterceptors<T> interceptors) {
-        super(client, conf, listenerExecutor, subscribeFuture, schema, 
interceptors,
+        super(client, conf, executorProvider, subscribeFuture, schema, 
interceptors,
                 false /* createTopicIfDoesNotExist */);
         this.topicsPattern = topicsPattern;
         this.subscriptionMode = subscriptionMode;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f2d2b53..a81a5d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -421,15 +421,13 @@ public class PulsarClientImpl implements PulsarClient {
             }
 
             ConsumerBase<T> consumer;
-            // gets the next single threaded executor from the list of 
executors
-            ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             if (metadata.partitions > 0) {
                 consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
-                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
+                        externalExecutorProvider, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
             } else {
                 int partitionIndex = TopicName.getPartitionIndex(topic);
                 try {
-                    consumer = 
ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, 
listenerThread, partitionIndex, false,
+                    consumer = 
ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, 
externalExecutorProvider, partitionIndex, false,
                             consumerSubscribedFuture,null, schema, 
interceptors,
                             true /* createTopicIfDoesNotExist */);
                     consumers.add(consumer);
@@ -453,7 +451,7 @@ public class PulsarClientImpl implements PulsarClient {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
-                externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema, interceptors,
+                externalExecutorProvider, consumerSubscribedFuture, schema, 
interceptors,
                 true /* createTopicIfDoesNotExist */);
 
         consumers.add(consumer);
@@ -486,7 +484,7 @@ public class PulsarClientImpl implements PulsarClient {
                 ConsumerBase<T> consumer = new 
PatternMultiTopicsConsumerImpl<T>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
-                    externalExecutorProvider.getExecutor(),
+                    externalExecutorProvider,
                     consumerSubscribedFuture,
                     schema, subscriptionMode, interceptors);
 
@@ -556,17 +554,15 @@ public class PulsarClientImpl implements PulsarClient {
                 return;
             }
             CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
-            // gets the next single threaded executor from the list of 
executors
-            ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             Reader<T> reader = null;
             ConsumerBase<T> consumer = null;
             if (metadata.partitions > 0) {
                 reader = new MultiTopicsReaderImpl<>(PulsarClientImpl.this,
-                        conf, listenerThread, consumerSubscribedFuture, 
schema);
+                        conf, externalExecutorProvider, 
consumerSubscribedFuture, schema);
                 consumer = ((MultiTopicsReaderImpl<T>) 
reader).getMultiTopicsConsumer();
             } else {
                 try {
-                    reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
listenerThread, consumerSubscribedFuture, schema);
+                    reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
externalExecutorProvider, consumerSubscribedFuture, schema);
                     consumer = ((ReaderImpl<T>) reader).getConsumer();
                 } catch (PulsarClientException.InvalidConfigurationException 
e) {
                     if (log.isDebugEnabled()) {
@@ -703,7 +699,7 @@ public class PulsarClientImpl implements PulsarClient {
         return timer;
     }
 
-    ExecutorProvider externalExecutorProvider() {
+    public ExecutorProvider externalExecutorProvider() {
         return externalExecutorProvider;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index e1744c7..8be2f27 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -21,13 +21,14 @@ package org.apache.pulsar.client.impl;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.TopicName;
 
 public class ReaderImpl<T> implements Reader<T> {
@@ -38,7 +39,7 @@ public class ReaderImpl<T> implements Reader<T> {
     private final ConsumerImpl<T> consumer;
 
     public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> 
readerConfiguration,
-                      ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema)
+                      ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema)
             throws PulsarClientException.InvalidConfigurationException {
 
         String subscription = "reader-" + 
DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
@@ -102,7 +103,7 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration,
-                listenerExecutor, partitionIdx, false, consumerFuture,
+                executorProvider, partitionIdx, false, consumerFuture,
                 readerConfiguration.getStartMessageId(), 
readerConfiguration.getStartMessageFromRollbackDurationInSec(),
                 schema, null, true /* createTopicIfDoesNotExist */);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 8db542d..0faf596 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -37,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
@@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     private volatile boolean waitingOnListenerForZeroQueueSize = false;
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-            MessageId startMessageId, Schema<T> schema,
-            ConsumerInterceptors<T> interceptors,
-            boolean createTopicIfDoesNotExist) throws 
PulsarClientException.InvalidConfigurationException {
-        super(client, topic, conf, listenerExecutor, partitionIndex, 
hasParentConsumer, subscribeFuture,
+             ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer,
+             CompletableFuture<Consumer<T>> subscribeFuture, MessageId 
startMessageId, Schema<T> schema,
+             ConsumerInterceptors<T> interceptors,
+             boolean createTopicIfDoesNotExist) throws 
PulsarClientException.InvalidConfigurationException {
+        super(client, topic, conf, executorProvider, partitionIndex, 
hasParentConsumer, subscribeFuture,
                 startMessageId, 0 /* startMessageRollbackDurationInSec */, 
schema, interceptors,
                 createTopicIfDoesNotExist);
     }
@@ -152,7 +152,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
         checkNotNull(listener, "listener can't be null");
         checkNotNull(message, "unqueued message can't be null");
 
-        listenerExecutor.execute(() -> {
+        pinnedExecutor.execute(() -> {
             stats.updateNumMsgsReceived(message);
             try {
                 if (log.isDebugEnabled()) {
@@ -172,7 +172,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     }
 
     @Override
-    protected void triggerListener(int numMessages) {
+    protected void triggerListener() {
         // Ignore since it was already triggered in the 
triggerZeroQueueSizeListener() call
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 48fca39..6a446d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 @Slf4j
 public class ExecutorProvider {
     private final int numThreads;
     private final List<ExecutorService> executors;
     private final AtomicInteger currentThread = new AtomicInteger(0);
+    private volatile boolean isShutdown;
 
     public ExecutorProvider(int numThreads, ThreadFactory threadFactory) {
         checkArgument(numThreads > 0);
@@ -45,12 +47,26 @@ public class ExecutorProvider {
         for (int i = 0; i < numThreads; i++) {
             
executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory));
         }
+        isShutdown = false;
     }
 
     public ExecutorService getExecutor() {
         return executors.get((currentThread.getAndIncrement() & 
Integer.MAX_VALUE) % numThreads);
     }
 
+    public ExecutorService getExecutor(Object object) {
+        return getExecutorInternal(object == null ? -1 : object.hashCode() & 
Integer.MAX_VALUE);
+    }
+
+    public ExecutorService getExecutor(byte[] bytes) {
+        int keyHash = Murmur3_32Hash.getInstance().makeHash(bytes);
+        return getExecutorInternal(keyHash);
+    }
+
+    private ExecutorService getExecutorInternal(int hash) {
+        return executors.get((hash & Integer.MAX_VALUE) % numThreads);
+    }
+
     public void shutdownNow() {
         executors.forEach(executor -> {
             executor.shutdownNow();
@@ -60,5 +76,10 @@ public class ExecutorProvider {
                 log.warn("Shutdown of thread pool was interrupted");
             }
         });
+        isShutdown = true;
+    }
+
+    public boolean isShutdown() {
+        return isShutdown;
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 085a4a9..8bb7bbc 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoop;
 import io.netty.util.Timer;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.mockito.Mockito;
@@ -81,4 +82,8 @@ class ClientTestFixtures {
     public static ExecutorService createMockedExecutor() {
         return mock(ExecutorService.class);
     }
+
+    public static ExecutorProvider createMockedExecutorProvider() {
+        return mock(ExecutorProvider.class);
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 920fb33..bf65971 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -28,16 +28,16 @@ import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -45,7 +45,7 @@ import org.testng.annotations.Test;
 public class ConsumerImplTest {
 
 
-    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    private final ExecutorProvider executorProvider = new ExecutorProvider(1, 
new DefaultThreadFactory("ConsumerImplTest"));
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
 
@@ -61,7 +61,7 @@ public class ConsumerImplTest {
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, false, subscribeFuture, null, null, null,
+                executorProvider, -1, false, subscribeFuture, null, null, null,
                 true);
         consumer.setState(HandlerState.State.Ready);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 664e78a..b246a87 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
@@ -36,12 +37,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
 
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
@@ -67,7 +65,7 @@ public class MultiTopicsConsumerImplTest {
 
         ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
         EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
-        ExecutorService listenerExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+        ExecutorProvider executorProvider = new ExecutorProvider(1, 
threadFactory);
 
         PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
 
@@ -78,7 +76,7 @@ public class MultiTopicsConsumerImplTest {
 
         MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
             clientImpl, consumerConfData,
-            listenerExecutor, null, null, null, true);
+            executorProvider, null, null, null, true);
 
         impl.getStats();
     }
@@ -105,7 +103,7 @@ public class MultiTopicsConsumerImplTest {
     }
 
     private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
-        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         int completionDelayMillis = 100;
@@ -115,7 +113,7 @@ public class MultiTopicsConsumerImplTest {
                 new PartitionedTopicMetadata(), completionDelayMillis));
         when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), 
any()))
                 .thenReturn(CompletableFuture.completedFuture(schema));
-        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
             new CompletableFuture<>(), schema, null, true);
         return impl;
     }
@@ -146,7 +144,7 @@ public class MultiTopicsConsumerImplTest {
 
     @Test
     public void testConsumerCleanupOnSubscribeFailure() throws 
InterruptedException, TimeoutException, ExecutionException {
-        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", 
"c")));
@@ -156,7 +154,7 @@ public class MultiTopicsConsumerImplTest {
         
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a 
mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
-        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
                 completeFuture, schema, null, true);
         // assert that we don't start in closed, then we move to closed and 
get an exception
         // indicating that closeAsync was called
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index d24841a..a79430b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -40,7 +40,8 @@ public class ReaderImplTest {
         ReaderConfigurationData<byte[]> readerConfiguration = new 
ReaderConfigurationData<>();
         readerConfiguration.setTopicName("topicName");
         CompletableFuture<Consumer<byte[]>> consumerFuture = new 
CompletableFuture<>();
-        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutor(), consumerFuture, Schema.BYTES);
+        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutorProvider(),
+                consumerFuture, Schema.BYTES);
     }
 
     @Test

Reply via email to