This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ca76f455128418206f04f0b3eb7a988925cc3ef1 Author: lipenghui <[email protected]> AuthorDate: Tue Jun 28 11:29:32 2022 +0800 [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236) The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method without any schedule delay. <img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png"> <img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png"> Profile result: [perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt) Running a performance test for single topic max message read rate test: ``` bin/pulsar-perf consume test -q 1000000 -p 100000000 bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2 ``` Without this PR (2.10.1): ``` Profiling started 2022-06-27T13:44:01,183+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057 2022-06-27T13:44:11,196+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042 2022-06-27T13:44:21,216+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548 2022-06-27T13:44:31,233+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847 2022-06-27T13:44:41,247+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286 ``` With this PR: ``` Profiling started 2022-06-27T13:56:20,426+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524 2022-06-27T13:56:30,438+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55 2022-06-27T13:56:40,450+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177 2022-06-27T13:56:50,462+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52 2022-06-27T13:57:00,475+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66 ``` Profile result with this PR: [perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt) - Change internal executor and external executor to normal executor service - Added a new ScheduledExecutorProvider to handle the scheduled tasks. (cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201) --- .../transaction/pendingack/PendingAckStore.java | 4 +- .../pendingack/impl/InMemoryPendingAckStore.java | 4 +- .../pendingack/impl/MLPendingAckStore.java | 4 +- .../pendingack/impl/PendingAckHandleImpl.java | 4 +- .../persistent/PersistentSubscriptionTest.java | 4 +- .../pulsar/client/api/MultiTopicsConsumerTest.java | 2 +- .../apache/pulsar/client/impl/ConsumerBase.java | 9 ++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++--- .../client/impl/MultiTopicsConsumerImpl.java | 48 +++++++++++----------- .../pulsar/client/impl/PulsarClientImpl.java | 12 +++++- .../pulsar/client/util/ExecutorProvider.java | 10 +++-- .../client/util/ScheduledExecutorProvider.java | 36 ++++++++++++++++ 12 files changed, 99 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java index 3da676eb827..2f85d2430db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java @@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; @@ -38,7 +38,7 @@ public interface PendingAckStore { * @param pendingAckHandle the handle of pending ack * @param executorService the replay executor service */ - void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService); + void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService); /** * Close the transaction pending ack store. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java index d882c80c478..44c9fbe039b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java @@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; public class InMemoryPendingAckStore implements PendingAckStore { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) { pendingAckHandle.changeToReadyState(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index e6d16fb7eae..af4e664b1e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore { } @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) { transactionReplayExecutor .execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle))); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 5b808f1dedb..41ef25b3e4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -156,8 +155,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi this.pendingAckStoreFuture = pendingAckStoreProvider.newPendingAckStore(persistentSubscription); this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - (ScheduledExecutorService) internalPinnedExecutor); + pendingAckStore.replayAsync(this, internalPinnedExecutor); }).exceptionally(e -> { acceptQueue.clear(); changeToErrorState(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index b9304cb5fb8..946f90a1ddd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -40,7 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -127,7 +127,7 @@ public class PersistentSubscriptionTest { public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) { return CompletableFuture.completedFuture(new PendingAckStore() { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { try { Field field = PendingAckHandleState.class.getDeclaredField("state"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index c8686bfa9c8..8bcb1824902 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -78,7 +78,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { PulsarClientImpl client = new PulsarClientImpl(conf) { { ScheduledExecutorService internalExecutorService = - (ScheduledExecutorService) super.getInternalExecutorService(); + (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor(); internalExecutorServiceDelegate = mock(ScheduledExecutorService.class, // a spy isn't used since that doesn't work for private classes, instead // the mock delegatesTo an existing instance. A delegate is sufficient for verifying diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 71fb2d62756..c53d49ad4bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.Lock; @@ -69,8 +68,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected final MessageListener<T> listener; protected final ConsumerEventListener consumerEventListener; protected final ExecutorProvider executorProvider; - protected final ScheduledExecutorService externalPinnedExecutor; - protected final ScheduledExecutorService internalPinnedExecutor; + protected final ExecutorService externalPinnedExecutor; + protected final ExecutorService internalPinnedExecutor; final BlockingQueue<Message<T>> incomingMessages; protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap; protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives; @@ -102,8 +101,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T this.unAckedChunkedMessageIdSequenceMap = ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build(); this.executorProvider = executorProvider; - this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(); - this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService(); + this.externalPinnedExecutor = executorProvider.getExecutor(); + this.internalPinnedExecutor = client.getInternalExecutorService(); this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3bcf95e7813..1a185d4c17d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1267,10 +1268,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - internalPinnedExecutor - .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), - expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, - TimeUnit.MILLISECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate( + () -> internalPinnedExecutor + .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)), + expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, + TimeUnit.MILLISECONDS + ); expireChunkMessageTaskScheduled = true; } @@ -2236,7 +2239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - internalPinnedExecutor.schedule(() -> { + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> { log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 71fef6f83f0..28e63816d7c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -24,6 +24,28 @@ import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; @@ -45,31 +67,8 @@ import org.apache.pulsar.common.util.CompletableFutureCancellationHandler; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -280,7 +279,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return null; } log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex); - internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider()) + .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); return null; }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 3f14558a7ed..9a4bada3278 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -75,6 +75,7 @@ import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -99,6 +100,8 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + + private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; private final boolean createdCnxPool; @@ -184,6 +187,8 @@ public class PulsarClientImpl implements PulsarClient { new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(), + "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { @@ -949,7 +954,7 @@ public class PulsarClientImpl implements PulsarClient { } previousExceptions.add(e); - ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> { + ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> { log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -1071,6 +1076,11 @@ public class PulsarClientImpl implements PulsarClient { public ExecutorService getInternalExecutorService() { return internalExecutorProvider.getExecutor(); } + + public ScheduledExecutorProvider getScheduledExecutorProvider() { + return scheduledExecutorProvider; + } + // // Transaction related API // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index 1318d5665ae..db11358057f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -28,7 +28,6 @@ import org.apache.pulsar.common.util.Murmur3_32Hash; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -43,7 +42,7 @@ public class ExecutorProvider { private final String poolName; private volatile boolean isShutdown; - private static class ExtendedThreadFactory extends DefaultThreadFactory { + protected static class ExtendedThreadFactory extends DefaultThreadFactory { @Getter private Thread thread; @@ -58,7 +57,6 @@ public class ExecutorProvider { } } - public ExecutorProvider(int numThreads, String poolName) { checkArgument(numThreads > 0); this.numThreads = numThreads; @@ -67,13 +65,17 @@ public class ExecutorProvider { for (int i = 0; i < numThreads; i++) { ExtendedThreadFactory threadFactory = new ExtendedThreadFactory( poolName, Thread.currentThread().isDaemon()); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + ExecutorService executor = createExecutor(threadFactory); executors.add(Pair.of(executor, threadFactory)); } isShutdown = false; this.poolName = poolName; } + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + public ExecutorService getExecutor() { return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java new file mode 100644 index 00000000000..887ae3bb7ff --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ScheduledExecutorProvider extends ExecutorProvider { + + public ScheduledExecutorProvider(int numThreads, String poolName) { + super(numThreads, poolName); + } + + @Override + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } +}
