This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ca76f455128418206f04f0b3eb7a988925cc3ef1
Author: lipenghui <[email protected]>
AuthorDate: Tue Jun 28 11:29:32 2022 +0800

    [improve][java-client] Replace ScheduledExecutor to improve performance of 
message consumption (#16236)
    
    The Scheduled Executor doesn't work very efficiently because each task will 
add to a DelayedQueue(A priority queue) first even if using the `.execute()` 
method without any schedule delay.
    
    <img width="1845" alt="image" 
src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png";>
    
    <img width="1848" alt="image" 
src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png";>
    
    Profile result:
    
[perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
    
    Running a performance test for single topic max message read rate test:
    
    ```
    bin/pulsar-perf consume test -q 1000000 -p 100000000
    bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
    ```
    
    Without this PR (2.10.1):
    
    ```
    Profiling started
    2022-06-27T13:44:01,183+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
23919664 msg --- 265702.851  msg/s --- 2.027 Mbit/s  --- Latency: mean: 
49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 
99.99pct: 53056 - Max: 53057
    2022-06-27T13:44:11,196+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
26690802 msg --- 276759.125  msg/s --- 2.112 Mbit/s  --- Latency: mean: 
56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 
99.99pct: 60042 - Max: 60042
    2022-06-27T13:44:21,216+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
28788693 msg --- 209467.861  msg/s --- 1.598 Mbit/s  --- Latency: mean: 
63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 
99.99pct: 67548 - Max: 67548
    2022-06-27T13:44:31,233+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
31255365 msg --- 246190.932  msg/s --- 1.878 Mbit/s  --- Latency: mean: 
71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 
99.99pct: 74847 - Max: 74847
    2022-06-27T13:44:41,247+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
33606630 msg --- 234769.313  msg/s --- 1.791 Mbit/s  --- Latency: mean: 
78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 
99.99pct: 82285 - Max: 82286
    ```
    
    With this PR:
    
    ```
    Profiling started
    2022-06-27T13:56:20,426+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
431272207 msg --- 1079360.516  msg/s --- 8.235 Mbit/s  --- Latency: mean: 
272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 
- Max: 524
    2022-06-27T13:56:30,438+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
441292346 msg --- 1000645.852  msg/s --- 7.634 Mbit/s  --- Latency: mean: 
15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 
55
    2022-06-27T13:56:40,450+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
451303308 msg --- 999973.040  msg/s --- 7.629 Mbit/s  --- Latency: mean: 18.265 
ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
    2022-06-27T13:56:50,462+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
461308082 msg --- 999309.458  msg/s --- 7.624 Mbit/s  --- Latency: mean: 14.728 
ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
    2022-06-27T13:57:00,475+0800 [main] INFO  
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 
471327606 msg --- 1000738.584  msg/s --- 7.635 Mbit/s  --- Latency: mean: 
21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 
66
    ```
    
    Profile result with this PR:
    
    
[perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
    
    - Change internal executor and external executor to normal executor service
    - Added a new ScheduledExecutorProvider to handle the scheduled tasks.
    
    (cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201)
---
 .../transaction/pendingack/PendingAckStore.java    |  4 +-
 .../pendingack/impl/InMemoryPendingAckStore.java   |  4 +-
 .../pendingack/impl/MLPendingAckStore.java         |  4 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |  4 +-
 .../persistent/PersistentSubscriptionTest.java     |  4 +-
 .../pulsar/client/api/MultiTopicsConsumerTest.java |  2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 ++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 13 +++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 48 +++++++++++-----------
 .../pulsar/client/impl/PulsarClientImpl.java       | 12 +++++-
 .../pulsar/client/util/ExecutorProvider.java       | 10 +++--
 .../client/util/ScheduledExecutorProvider.java     | 36 ++++++++++++++++
 12 files changed, 99 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
index 3da676eb827..2f85d2430db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -38,7 +38,7 @@ public interface PendingAckStore {
      * @param pendingAckHandle the handle of pending ack
      * @param executorService the replay executor service
      */
-    void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService executorService);
+    void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService 
executorService);
 
     /**
      * Close the transaction pending ack store.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
index d882c80c478..44c9fbe039b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 public class InMemoryPendingAckStore implements PendingAckStore {
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService scheduledExecutorService) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ExecutorService scheduledExecutorService) {
         pendingAckHandle.changeToReadyState();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index e6d16fb7eae..af4e664b1e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore {
     }
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService transactionReplayExecutor) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ExecutorService transactionReplayExecutor) {
         transactionReplayExecutor
                 .execute(new PendingAckReplay(new 
MLPendingAckReplyCallBack(pendingAckHandle)));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 5b808f1dedb..41ef25b3e4d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -156,8 +155,7 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                 this.pendingAckStoreFuture =
                         
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
                 this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                    pendingAckStore.replayAsync(this,
-                            (ScheduledExecutorService) internalPinnedExecutor);
+                    pendingAckStore.replayAsync(this, internalPinnedExecutor);
                 }).exceptionally(e -> {
                     acceptQueue.clear();
                     changeToErrorState();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index b9304cb5fb8..946f90a1ddd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -127,7 +127,7 @@ public class PersistentSubscriptionTest {
             public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
                 return CompletableFuture.completedFuture(new PendingAckStore() 
{
                     @Override
-                    public void replayAsync(PendingAckHandleImpl 
pendingAckHandle, ScheduledExecutorService executorService) {
+                    public void replayAsync(PendingAckHandleImpl 
pendingAckHandle, ExecutorService executorService) {
                         try {
                             Field field = 
PendingAckHandleState.class.getDeclaredField("state");
                             field.setAccessible(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index c8686bfa9c8..8bcb1824902 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -78,7 +78,7 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
         PulsarClientImpl client = new PulsarClientImpl(conf) {
             {
                 ScheduledExecutorService internalExecutorService =
-                        (ScheduledExecutorService) 
super.getInternalExecutorService();
+                        (ScheduledExecutorService) 
super.getScheduledExecutorProvider().getExecutor();
                 internalExecutorServiceDelegate = 
mock(ScheduledExecutorService.class,
                         // a spy isn't used since that doesn't work for 
private classes, instead
                         // the mock delegatesTo an existing instance. A 
delegate is sufficient for verifying
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 71fb2d62756..c53d49ad4bd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -69,8 +68,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
-    protected final ScheduledExecutorService externalPinnedExecutor;
-    protected final ScheduledExecutorService internalPinnedExecutor;
+    protected final ExecutorService externalPinnedExecutor;
+    protected final ExecutorService internalPinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
@@ -102,8 +101,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         this.unAckedChunkedMessageIdSequenceMap =
                 ConcurrentOpenHashMap.<MessageIdImpl, 
MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
-        this.externalPinnedExecutor = (ScheduledExecutorService) 
executorProvider.getExecutor();
-        this.internalPinnedExecutor = (ScheduledExecutorService) 
client.getInternalExecutorService();
+        this.externalPinnedExecutor = executorProvider.getExecutor();
+        this.internalPinnedExecutor = client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3bcf95e7813..1a185d4c17d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1267,10 +1268,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && 
expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            internalPinnedExecutor
-                    
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
-                            expireTimeOfIncompleteChunkedMessageMillis, 
expireTimeOfIncompleteChunkedMessageMillis,
-                            TimeUnit.MILLISECONDS);
+            ((ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
+                    () -> internalPinnedExecutor
+                            
.execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
+                    expireTimeOfIncompleteChunkedMessageMillis, 
expireTimeOfIncompleteChunkedMessageMillis,
+                    TimeUnit.MILLISECONDS
+            );
             expireChunkMessageTaskScheduled = true;
         }
 
@@ -2236,7 +2239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return;
             }
 
-            internalPinnedExecutor.schedule(() -> {
+            ((ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 71fef6f83f0..28e63816d7c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -24,6 +24,28 @@ import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -45,31 +67,8 @@ import 
org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -280,7 +279,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying 
later", consumer, ex);
-            internalPinnedExecutor.schedule(() -> 
receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
+                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, 
TimeUnit.SECONDS);
             return null;
         });
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3f14558a7ed..9a4bada3278 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -75,6 +75,7 @@ import 
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid
 import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
 import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -99,6 +100,8 @@ public class PulsarClientImpl implements PulsarClient {
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorProvider;
+
+    private final ScheduledExecutorProvider scheduledExecutorProvider;
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
 
@@ -184,6 +187,8 @@ public class PulsarClientImpl implements PulsarClient {
                     new ExecutorProvider(conf.getNumListenerThreads(), 
"pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? 
internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
+            this.scheduledExecutorProvider = new 
ScheduledExecutorProvider(conf.getNumIoThreads(),
+                    "pulsar-client-scheduled");
             if (conf.getServiceUrl().startsWith("http")) {
                 lookup = new HttpLookupService(conf, this.eventLoopGroup);
             } else {
@@ -949,7 +954,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
             previousExceptions.add(e);
 
-            ((ScheduledExecutorService) 
externalExecutorProvider.getExecutor()).schedule(() -> {
+            ((ScheduledExecutorService) 
scheduledExecutorProvider.getExecutor()).schedule(() -> {
                 log.warn("[topic: {}] Could not get connection while 
getPartitionedTopicMetadata -- "
                         + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
@@ -1071,6 +1076,11 @@ public class PulsarClientImpl implements PulsarClient {
     public ExecutorService getInternalExecutorService() {
         return internalExecutorProvider.getExecutor();
     }
+
+    public ScheduledExecutorProvider getScheduledExecutorProvider() {
+        return scheduledExecutorProvider;
+    }
+
     //
     // Transaction related API
     //
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 1318d5665ae..db11358057f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -28,7 +28,6 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -43,7 +42,7 @@ public class ExecutorProvider {
     private final String poolName;
     private volatile boolean isShutdown;
 
-    private static class ExtendedThreadFactory extends DefaultThreadFactory {
+    protected static class ExtendedThreadFactory extends DefaultThreadFactory {
 
         @Getter
         private Thread thread;
@@ -58,7 +57,6 @@ public class ExecutorProvider {
         }
     }
 
-
     public ExecutorProvider(int numThreads, String poolName) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
@@ -67,13 +65,17 @@ public class ExecutorProvider {
         for (int i = 0; i < numThreads; i++) {
             ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
                     poolName, Thread.currentThread().isDaemon());
-            ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+            ExecutorService executor = createExecutor(threadFactory);
             executors.add(Pair.of(executor, threadFactory));
         }
         isShutdown = false;
         this.poolName = poolName;
     }
 
+    protected ExecutorService createExecutor(ExtendedThreadFactory 
threadFactory) {
+       return Executors.newSingleThreadExecutor(threadFactory);
+    }
+
     public ExecutorService getExecutor() {
         return executors.get((currentThread.getAndIncrement() & 
Integer.MAX_VALUE) % numThreads).getKey();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
new file mode 100644
index 00000000000..887ae3bb7ff
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ScheduledExecutorProvider extends ExecutorProvider {
+
+    public ScheduledExecutorProvider(int numThreads, String poolName) {
+        super(numThreads, poolName);
+    }
+
+    @Override
+    protected ExecutorService createExecutor(ExtendedThreadFactory 
threadFactory) {
+        return Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
+}

Reply via email to