This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bad716ec3739b7fb2760f6f534306fcb778fc0c6 Author: lipenghui <[email protected]> AuthorDate: Sun Feb 21 11:12:13 2021 +0800 Able to handling messages with multiple listener threads in order for the Key_Shared subscription. (#9329) Currently, a consumer is pinged to a given listener thread to ensure the message is processed ordered for a topic. But for the Key_Shared subscription, the message process order is based on the message key, not the order of the topic. So this PR is able to handle messages with multiple listener threads for the Key_Shared subscription which also keeps the order of message key. (cherry picked from commit e4523e09b38514bba87a9fe9842f43d9f7cdce34) --- .../apache/pulsar/client/impl/RawReaderImpl.java | 2 +- .../client/api/KeySharedSubscriptionTest.java | 70 +++++++++++++++++++ .../pulsar/client/impl/TopicsConsumerImplTest.java | 7 ++ .../apache/pulsar/client/impl/ConsumerBase.java | 70 +++++++++++++++++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 66 +++++------------- .../client/impl/MultiTopicsConsumerImpl.java | 79 ++++++++-------------- .../pulsar/client/impl/MultiTopicsReaderImpl.java | 6 +- .../impl/PatternMultiTopicsConsumerImpl.java | 7 +- .../pulsar/client/impl/PulsarClientImpl.java | 18 ++--- .../org/apache/pulsar/client/impl/ReaderImpl.java | 7 +- .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 10 +-- .../pulsar/client/util/ExecutorProvider.java | 21 ++++++ .../pulsar/client/impl/ClientTestFixtures.java | 5 ++ .../pulsar/client/impl/ConsumerImplTest.java | 8 +-- .../client/impl/MultiTopicsConsumerImplTest.java | 16 ++--- .../apache/pulsar/client/impl/ReaderImplTest.java | 3 +- 16 files changed, 247 insertions(+), 148 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 4146d40..f57d0bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -113,7 +113,7 @@ public class RawReaderImpl implements RawReader { super(client, conf.getSingleTopic(), conf, - client.externalExecutorProvider().getExecutor(), + client.externalExecutorProvider(), TopicName.getPartitionIndex(conf.getSingleTopic()), false, consumerFuture, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 50011d4..e9b30b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcher import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -67,6 +69,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { }; } + @DataProvider(name = "partitioned") + public Object[][] partitionedProvider() { + return new Object[][] { + { false }, + { true } + }; + } + @DataProvider(name = "data") public Object[][] dataProvider() { return new Object[][] { @@ -855,6 +865,66 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS)); } + @Test(dataProvider = "partitioned") + public void testOrderingWithConsumerListener(boolean partitioned) throws Exception { + final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID(); + if (partitioned) { + admin.topics().createPartitionedTopic(topic, 3); + } + final String subName = "my-sub"; + final int messages = 1000; + List<Message<Integer>> received = Collections.synchronizedList(new ArrayList<>(1000)); + Random random = new Random(); + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .listenerThreads(8) + .build(); + + Consumer<Integer> consumer = client.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(new MessageListener<Integer>() { + @Override + public void received(Consumer<Integer> consumer, Message<Integer> msg) { + try { + Thread.sleep(random.nextInt(5)); + received.add(msg); + } catch (InterruptedException ignore) { + } + } + }) + .subscribe(); + + + Producer<Integer> producer = client.newProducer(Schema.INT32) + .topic(topic) + .create(); + + String[] keys = new String[]{"key-1", "key-2", "key-3"}; + for (int i = 0; i < messages; i++) { + producer.newMessage().key(keys[i % 3]).value(i).send(); + } + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(received.size(), messages)); + + Map<String, Integer> maxValueOfKeys = new HashMap<>(); + for (Message<Integer> msg : received) { + String key = msg.getKey(); + Integer value = msg.getValue(); + if (maxValueOfKeys.containsKey(key)) { + Assert.assertTrue(value > maxValueOfKeys.get(key)); + } + maxValueOfKeys.put(key, value); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + } + private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException { return pulsarClient.newConsumer(Schema.STRING) .topic(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 17da94a..216438b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -20,11 +20,16 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; +import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRouter; @@ -37,6 +42,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dc4d2c1..30c3d01 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,8 +20,9 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; + +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; @@ -29,7 +30,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.Lock; @@ -52,7 +53,9 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; @@ -70,7 +73,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected final CompletableFuture<Consumer<T>> subscribeFuture; protected final MessageListener<T> listener; protected final ConsumerEventListener consumerEventListener; - protected final ExecutorService listenerExecutor; + protected final ExecutorProvider executorProvider; + protected final ScheduledExecutorService pinnedExecutor; final BlockingQueue<Message<T>> incomingMessages; protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap; protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives; @@ -86,7 +90,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected final Lock reentrantLock = new ReentrantLock(); protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - int receiverQueueSize, ExecutorService listenerExecutor, + int receiverQueueSize, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) { super(client, topic); this.maxReceiverQueueSize = receiverQueueSize; @@ -99,8 +103,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T // Always use growable queue since items can exceed the advertised size this.incomingMessages = new GrowableArrayBlockingQueue<>(); this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>(); - - this.listenerExecutor = listenerExecutor; + this.executorProvider = executorProvider; + this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(); this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; this.interceptors = interceptors; @@ -232,7 +236,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) { - listenerExecutor.execute(() -> { + pinnedExecutor.execute(() -> { if (!receivedFuture.complete(message)) { log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}", receivedFuture.isCancelled(), message); @@ -842,6 +846,58 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } } + protected void triggerListener() { + // Trigger the notification on the message listener in a separate thread to avoid blocking the networking + // thread while the message processing happens + Message<T> msg; + do { + try { + msg = internalReceive(0, TimeUnit.MILLISECONDS); + if (msg != null) { + final Message<T> finalMsg = msg; + if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { + executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() -> + callMessageListener(finalMsg)); + } else { + pinnedExecutor.execute(() -> callMessageListener(finalMsg)); + } + } + } catch (PulsarClientException e) { + log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); + return; + } + } while (msg != null); + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); + } + } + + protected void callMessageListener(Message<T> msg) { + try { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, + msg.getMessageId()); + } + listener.received(ConsumerBase.this, msg); + } catch (Throwable t) { + log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, + msg.getMessageId(), t); + } + } + + protected static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8); + protected byte[] peekMessageKey(Message<T> msg) { + byte[] key = NONE_KEY; + if (msg.hasKey()) { + key = msg.getKeyBytes(); + } + if (msg.hasOrderingKey()) { + key = msg.getOrderingKey(); + } + return key; + } + protected MessagesImpl<T> getNewMessagesImpl() { return new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(), batchReceivePolicy.getMaxNumBytes()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d4055b0..ef39bfe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,8 +48,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -81,6 +79,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; @@ -195,7 +194,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, @@ -203,14 +202,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { - return newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, + return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0); } static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, @@ -220,23 +219,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle boolean createTopicIfDoesNotExist, long startMessageRollbackDurationInSec) { if (conf.getReceiverQueueSize() == 0) { - return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, + return new ZeroQueueConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist); } else { - return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, + return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, startMessageRollbackDurationInSec /* rollback time in sec to start msgId */, schema, interceptors, createTopicIfDoesNotExist); } } protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { - super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors); + super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors); this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; @@ -1095,7 +1094,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private void failPendingReceive() { lock.readLock().lock(); try { - if (listenerExecutor != null && !listenerExecutor.isShutdown()) { + if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) { failPendingReceives(this.pendingReceives); failPendingBatchReceives(this.pendingBatchReceives); } @@ -1109,7 +1108,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - listenerExecutor.execute(() -> { + pinnedExecutor.execute(() -> { if (isActive) { consumerEventListener.becameActive(this, partitionIndex); } else { @@ -1227,7 +1226,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } if (listener != null) { - triggerListener(numMessages); + triggerListener(); } } @@ -1240,7 +1239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - ((ScheduledExecutorService) listenerExecutor).scheduleAtFixedRate(() -> { + pinnedExecutor.scheduleAtFixedRate(() -> { removeExpireIncompleteChunkedMessages(); }, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS); @@ -1317,39 +1316,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return uncompressedPayload; } - protected void triggerListener(int numMessages) { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens - listenerExecutor.execute(() -> { - for (int i = 0; i < numMessages; i++) { - try { - Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); - // complete the callback-loop in case queue is cleared up - if (msg == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); - } - break; - } - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, - msg.getMessageId()); - } - listener.received(ConsumerImpl.this, msg); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, - msg.getMessageId(), t); - } - - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - } - }); - } - /** * Notify waiting asyncReceive request with the received message * @@ -1367,13 +1333,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } if (exception != null) { - listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); + pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); return; } if (message == null) { IllegalStateException e = new IllegalStateException("received message can't be null"); - listenerExecutor.execute(() -> receivedFuture.completeExceptionally(e)); + pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e)); return; } @@ -2199,9 +2165,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - ((ScheduledExecutorService) listenerExecutor).schedule(() -> { + pinnedExecutor.schedule(() -> { log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", - topic, getHandlerName(), nextDelay); + topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); internalGetLastMessageIdAsync(backoff, remainingTime, future); }, nextDelay, TimeUnit.MILLISECONDS); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8efab22..b53a342 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; @@ -38,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -59,7 +63,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -68,8 +71,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; +import org.apache.pulsar.client.util.ExecutorProvider; public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @@ -108,31 +110,33 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private final long startMessageRollbackDurationInSec; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { - this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor, + this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist); } MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, - ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, - long startMessageRollbackDurationInSec) { - this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor, - subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, startMessageId, startMessageRollbackDurationInSec); + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, + ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, + long startMessageRollbackDurationInSec) { + this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, executorProvider, + subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, startMessageId, + startMessageRollbackDurationInSec); } MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, - ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { - this(client, singleTopic, conf, listenerExecutor, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, null, 0); + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, + ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { + this(client, singleTopic, conf, executorProvider, subscribeFuture, schema, interceptors, + createTopicIfDoesNotExist, null, 0); } MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, - ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, - long startMessageRollbackDurationInSec) { - super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, + ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, + long startMessageRollbackDurationInSec) { + super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); checkArgument(conf.getReceiverQueueSize() > 0, @@ -282,34 +286,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } if (listener != null) { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens - listenerExecutor.execute(() -> { - Message<T> msg; - try { - msg = internalReceive(0, TimeUnit.MILLISECONDS); - if (msg == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); - } - return; - } - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", - topic, subscription, message.getMessageId()); - } - listener.received(MultiTopicsConsumerImpl.this, msg); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing message: {}", - topic, subscription, message, t); - } - }); + triggerListener(); } } @@ -597,7 +574,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } private void failPendingReceive() { - if (listenerExecutor != null && !listenerExecutor.isShutdown()) { + if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) { failPendingReceives(pendingReceives); failPendingBatchReceives(pendingBatchReceives); } @@ -856,7 +833,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // first create a consumer with no topic, then do subscription for already know partitionedTopic. public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarClientImpl client, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, int numPartitions, Schema<T> schema, ConsumerInterceptors<T> interceptors) { @@ -868,7 +845,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { cloneConf.getTopicNames().remove(topicName); CompletableFuture<Consumer> future = new CompletableFuture<>(); - MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor, + MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, executorProvider, future, schema, interceptors, true /* createTopicIfDoesNotExist */); future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions)) @@ -952,7 +929,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName, - configurationData, client.externalExecutorProvider().getExecutor(), + configurationData, client.externalExecutorProvider(), partitionIndex, true, subFuture, startMessageId, schema, interceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); @@ -973,7 +950,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, - client.externalExecutorProvider().getExecutor(), -1, true, subFuture, null, + client.externalExecutorProvider(), -1, true, subFuture, null, schema, interceptors, createIfDoesNotExist); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); @@ -1250,7 +1227,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig(); ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl( client, partitionName, configurationData, - client.externalExecutorProvider().getExecutor(), + client.externalExecutorProvider(), partitionIndex, true, subFuture, null, schema, interceptors, true /* createTopicIfDoesNotExist */); synchronized (pauseMutex) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 9824b54..06e8245 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.codec.digest.DigestUtils; @@ -43,13 +42,14 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.client.util.ExecutorProvider; public class MultiTopicsReaderImpl<T> implements Reader<T> { private final MultiTopicsConsumerImpl<T> multiTopicsConsumer; public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) { + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) { String subscription = "multiTopicsReader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10); if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) { subscription = readerConfiguration.getSubscriptionRolePrefix() + "-" + subscription; @@ -98,7 +98,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> { .ranges(readerConfiguration.getKeyHashRanges()) ); } - multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, consumerConfiguration, listenerExecutor, consumerFuture, schema, + multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, consumerConfiguration, executorProvider, consumerFuture, schema, null, true, readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 91b994a..af161f0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -28,14 +28,15 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; + import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -52,10 +53,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, PulsarClientImpl client, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, Mode subscriptionMode, ConsumerInterceptors<T> interceptors) { - super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors, + super(client, conf, executorProvider, subscribeFuture, schema, interceptors, false /* createTopicIfDoesNotExist */); this.topicsPattern = topicsPattern; this.subscriptionMode = subscriptionMode; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 149c617..43b8acb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -372,14 +372,12 @@ public class PulsarClientImpl implements PulsarClient { } ConsumerBase<T> consumer; - // gets the next single threaded executor from the list of executors - ExecutorService listenerThread = externalExecutorProvider.getExecutor(); if (metadata.partitions > 0) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, - listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); + externalExecutorProvider, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { int partitionIndex = TopicName.getPartitionIndex(topic); - consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false, + consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, externalExecutorProvider, partitionIndex, false, consumerSubscribedFuture,null, schema, interceptors, true /* createTopicIfDoesNotExist */); } @@ -398,7 +396,7 @@ public class PulsarClientImpl implements PulsarClient { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, - externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors, + externalExecutorProvider, consumerSubscribedFuture, schema, interceptors, true /* createTopicIfDoesNotExist */); consumers.add(consumer); @@ -431,7 +429,7 @@ public class PulsarClientImpl implements PulsarClient { ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<T>(conf.getTopicsPattern(), PulsarClientImpl.this, conf, - externalExecutorProvider.getExecutor(), + externalExecutorProvider, consumerSubscribedFuture, schema, subscriptionMode, interceptors); @@ -501,16 +499,14 @@ public class PulsarClientImpl implements PulsarClient { return; } CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); - // gets the next single threaded executor from the list of executors - ExecutorService listenerThread = externalExecutorProvider.getExecutor(); Reader<T> reader; ConsumerBase<T> consumer; if (metadata.partitions > 0) { reader = new MultiTopicsReaderImpl<>(PulsarClientImpl.this, - conf, listenerThread, consumerSubscribedFuture, schema); + conf, externalExecutorProvider, consumerSubscribedFuture, schema); consumer = ((MultiTopicsReaderImpl<T>) reader).getMultiTopicsConsumer(); } else { - reader = new ReaderImpl<>(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, schema); + reader = new ReaderImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider, consumerSubscribedFuture, schema); consumer = ((ReaderImpl<T>) reader).getConsumer(); } @@ -641,7 +637,7 @@ public class PulsarClientImpl implements PulsarClient { return timer; } - ExecutorProvider externalExecutorProvider() { + public ExecutorProvider externalExecutorProvider() { return externalExecutorProvider; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 146f5fc..66ff338 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -21,13 +21,14 @@ package org.apache.pulsar.client.impl; import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; + import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.TopicName; public class ReaderImpl<T> implements Reader<T> { @@ -38,7 +39,7 @@ public class ReaderImpl<T> implements Reader<T> { private final ConsumerImpl<T> consumer; public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) { + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, Schema<T> schema) { String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10); if (StringUtils.isNotBlank(readerConfiguration.getSubscriptionRolePrefix())) { @@ -98,7 +99,7 @@ public class ReaderImpl<T> implements Reader<T> { final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName()); consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, - listenerExecutor, partitionIdx, false, consumerFuture, + executorProvider, partitionIdx, false, consumerFuture, readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(), schema, null, true /* createTopicIfDoesNotExist */); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index ba6e97d..03d1315 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -39,6 +38,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.client.util.ExecutorProvider; @Slf4j public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { @@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { private volatile boolean waitingOnListenerForZeroQueueSize = false; public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { - super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, + super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors, createTopicIfDoesNotExist); } @@ -152,7 +152,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { checkNotNull(listener, "listener can't be null"); checkNotNull(message, "unqueued message can't be null"); - listenerExecutor.execute(() -> { + pinnedExecutor.execute(() -> { stats.updateNumMsgsReceived(message); try { if (log.isDebugEnabled()) { @@ -172,7 +172,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { } @Override - protected void triggerListener(int numMessages) { + protected void triggerListener() { // Ignore since it was already triggered in the triggerZeroQueueSizeListener() call } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index 48fca39..6a446d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.Murmur3_32Hash; @Slf4j public class ExecutorProvider { private final int numThreads; private final List<ExecutorService> executors; private final AtomicInteger currentThread = new AtomicInteger(0); + private volatile boolean isShutdown; public ExecutorProvider(int numThreads, ThreadFactory threadFactory) { checkArgument(numThreads > 0); @@ -45,12 +47,26 @@ public class ExecutorProvider { for (int i = 0; i < numThreads; i++) { executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory)); } + isShutdown = false; } public ExecutorService getExecutor() { return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads); } + public ExecutorService getExecutor(Object object) { + return getExecutorInternal(object == null ? -1 : object.hashCode() & Integer.MAX_VALUE); + } + + public ExecutorService getExecutor(byte[] bytes) { + int keyHash = Murmur3_32Hash.getInstance().makeHash(bytes); + return getExecutorInternal(keyHash); + } + + private ExecutorService getExecutorInternal(int hash) { + return executors.get((hash & Integer.MAX_VALUE) % numThreads); + } + public void shutdownNow() { executors.forEach(executor -> { executor.shutdownNow(); @@ -60,5 +76,10 @@ public class ExecutorProvider { log.warn("Shutdown of thread pool was interrupted"); } }); + isShutdown = true; + } + + public boolean isShutdown() { + return isShutdown; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java index 085a4a9..8bb7bbc 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; import io.netty.util.Timer; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.mockito.Mockito; @@ -81,4 +82,8 @@ class ClientTestFixtures { public static ExecutorService createMockedExecutor() { return mock(ExecutorService.class); } + + public static ExecutorProvider createMockedExecutorProvider() { + return mock(ExecutorProvider.class); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 75d7753..dec7c2a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -28,16 +28,16 @@ import static org.mockito.Mockito.verify; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -45,7 +45,7 @@ import org.testng.annotations.Test; public class ConsumerImplTest { - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest")); private ConsumerImpl<byte[]> consumer; private ConsumerConfigurationData consumerConf; @@ -61,7 +61,7 @@ public class ConsumerImplTest { consumerConf.setSubscriptionName("test-sub"); consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf, - executorService, -1, false, subscribeFuture, null, null, null, + executorProvider, -1, false, subscribeFuture, null, null, null, true); consumer.setState(HandlerState.State.Ready); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 664e78a..b246a87 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.testng.annotations.Test; @@ -36,12 +37,9 @@ import java.util.Arrays; import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; import static org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture; import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture; @@ -67,7 +65,7 @@ public class MultiTopicsConsumerImplTest { ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon()); EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory); - ExecutorService listenerExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); + ExecutorProvider executorProvider = new ExecutorProvider(1, threadFactory); PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup); @@ -78,7 +76,7 @@ public class MultiTopicsConsumerImplTest { MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl( clientImpl, consumerConfData, - listenerExecutor, null, null, null, true); + executorProvider, null, null, null, true); impl.getStats(); } @@ -105,7 +103,7 @@ public class MultiTopicsConsumerImplTest { } private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() { - ExecutorService listenerExecutor = mock(ExecutorService.class); + ExecutorProvider executorProvider = mock(ExecutorProvider.class); ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>(); consumerConfData.setSubscriptionName("subscriptionName"); int completionDelayMillis = 100; @@ -115,7 +113,7 @@ public class MultiTopicsConsumerImplTest { new PartitionedTopicMetadata(), completionDelayMillis)); when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), any())) .thenReturn(CompletableFuture.completedFuture(schema)); - MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor, + MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider, new CompletableFuture<>(), schema, null, true); return impl; } @@ -146,7 +144,7 @@ public class MultiTopicsConsumerImplTest { @Test public void testConsumerCleanupOnSubscribeFailure() throws InterruptedException, TimeoutException, ExecutionException { - ExecutorService listenerExecutor = mock(ExecutorService.class); + ExecutorProvider executorProvider = mock(ExecutorProvider.class); ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>(); consumerConfData.setSubscriptionName("subscriptionName"); consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c"))); @@ -156,7 +154,7 @@ public class MultiTopicsConsumerImplTest { when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture( new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>(); - MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor, + MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider, completeFuture, schema, null, true); // assert that we don't start in closed, then we move to closed and get an exception // indicating that closeAsync was called diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java index e4d50db..6e587f7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java @@ -39,7 +39,8 @@ public class ReaderImplTest { ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>(); readerConfiguration.setTopicName("topicName"); CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>(); - reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutor(), consumerFuture, Schema.BYTES); + reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(), + consumerFuture, Schema.BYTES); } @Test
