This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 97fdb483e020e4a77bc3c354e5a1acb5d24ed424
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 19 08:48:02 2022 +0200

    [Client] Support passing existing executor providers to the client (#12037)
    
    In load and performance testing, there's a need to simulate production use 
cases and production workloads.
    For this purpose, it would be useful to be able to share the thread pools 
used by Pulsar client instances in order to be able to run a large amount of 
Pulsar clients in a single JVM without the overhead of a lot of threads.
    In the current solution, it's already possible to share the EventLoopGroup 
and HashedWheelTimer instances.
    The solution for sharing the thread pools for the external / internal 
executors was missing. This PR adds support for that.
    
    Example usage:
    
    ```java
    // shared thread pool related resources
    ExecutorProvider internalExecutorProvider = new ExecutorProvider(8, 
"shared-internal-executor");
    ExecutorProvider externalExecutorProvider = new ExecutorProvider(8, 
"shared-external-executor");
    Timer sharedTimer = new 
HashedWheelTimer(getThreadFactory("shared-pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
    EventLoopGroup sharedEventLoopGroup = new EpollEventLoopGroup();
    
    // example of creating a client which uses the shared thread pools
    PulsarClientImpl client = PulsarClientImpl.builder().conf(conf)
                    .internalExecutorProvider(internalExecutorProvider)
                    .externalExecutorProvider(externalExecutorProvider)
                    .timer(sharedTimer)
                    .eventLoopGroup(sharedEventLoopGroup)
                    .build();
    ```
    
    It seems that this would also improve the performance of the Pulsar Proxy 
since new thread pools for every client connection.
    
    That happens in the Pulsar Proxy currently:
    
https://github.com/apache/pulsar/blob/af63e96d4aaa0ae4c4086583aa4f9b1edd72279b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L445-L451
    
    An optimization was added in #9802 for sharing the timer, but it would be 
useful to also share the internal / external executors.
    
    (cherry picked from commit 4591a210a8d924d99811edfdfeb7433452e5eeb4)
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 170 ++++++++++++---------
 .../pulsar/client/impl/PulsarClientImplTest.java   |  32 ++++
 2 files changed, 130 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7c318f25a8e..3f14558a7ed 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -19,18 +19,15 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
-
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.util.ArrayList;
@@ -50,7 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
+import lombok.Builder;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -94,13 +91,14 @@ public class PulsarClientImpl implements PulsarClient {
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
 
     protected final ClientConfigurationData conf;
+    private final boolean createdExecutorProviders;
     private LookupService lookup;
     private final ConnectionPool cnxPool;
     @Getter
     private final Timer timer;
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
-    private final ExecutorProvider internalExecutorService;
+    private final ExecutorProvider internalExecutorProvider;
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
 
@@ -115,20 +113,22 @@ public class PulsarClientImpl implements PulsarClient {
 
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private final AtomicLong consumerIdGenerator = new AtomicLong();
-    private final AtomicLong requestIdGenerator
-        = new AtomicLong(ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE/2));
+    private final AtomicLong requestIdGenerator =
+            new AtomicLong(ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE / 2));
 
     protected final EventLoopGroup eventLoopGroup;
     private final MemoryLimitController memoryLimitController;
 
-    private final LoadingCache<String, SchemaInfoProvider> 
schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000)
-                    .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<String, SchemaInfoProvider>() {
+    private final LoadingCache<String, SchemaInfoProvider> 
schemaProviderLoadingCache =
+            CacheBuilder.newBuilder().maximumSize(100000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES)
+                    .build(new CacheLoader<String, SchemaInfoProvider>() {
 
-                @Override
-                public SchemaInfoProvider load(String topicName) {
-                    return newSchemaProvider(topicName);
-                }
-            });
+                        @Override
+                        public SchemaInfoProvider load(String topicName) {
+                            return newSchemaProvider(topicName);
+                        }
+                    });
 
     private final Clock clientClock;
 
@@ -136,48 +136,59 @@ public class PulsarClientImpl implements PulsarClient {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws 
PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
-    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer)
+    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
+                            Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean 
createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider 
externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws 
PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || 
eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != 
(internalExecutorProvider == null)) {
+                throw new IllegalArgumentException(
+                        "Both externalExecutorProvider and 
internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup 
: getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || 
this.eventLoopGroup == null) {
                 throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new 
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference =
+                    connectionPool != null ? connectionPool : new 
ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? 
externalExecutorProvider :
+                    new ExecutorProvider(conf.getNumListenerThreads(), 
"pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? 
internalExecutorProvider :
+                    new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
             if (conf.getServiceUrl().startsWith("http")) {
-                lookup = new HttpLookupService(conf, eventLoopGroup);
+                lookup = new HttpLookupService(conf, this.eventLoopGroup);
             } else {
-                lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
+                lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(),
+                        conf.isUseTls(), 
this.externalExecutorProvider.getExecutor());
             }
             if (timer == null) {
                 this.timer = new 
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
@@ -200,8 +211,8 @@ public class PulsarClientImpl implements PulsarClient {
             state.set(State.Open);
         } catch (Throwable t) {
             shutdown();
-            shutdownEventLoopGroup(eventLoopGroup);
-            closeCnxPool(cnxPool);
+            shutdownEventLoopGroup(eventLoopGroupReference);
+            closeCnxPool(connectionPoolReference);
             throw t;
         }
     }
@@ -279,11 +290,13 @@ public class PulsarClientImpl implements PulsarClient {
 
         if (schema instanceof AutoConsumeSchema) {
             return FutureUtil.failedFuture(
-                new 
PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only 
used by consumers to detect schemas automatically"));
+                    new PulsarClientException.InvalidConfigurationException(
+                            "AutoConsumeSchema is only used by consumers to 
detect schemas automatically"));
         }
 
         if (state.get() != State.Open) {
-            return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed : state = " 
+ state.get()));
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.AlreadyClosedException("Client 
already closed : state = " + state.get()));
         }
 
         String topic = conf.getTopicName();
@@ -360,7 +373,8 @@ public class PulsarClientImpl implements PulsarClient {
                                                                         
ProducerConfigurationData conf,
                                                                         
Schema<T> schema,
                                                                         
ProducerInterceptors interceptors,
-                                                                        
CompletableFuture<Producer<T>> producerCreatedFuture,
+                                                                        
CompletableFuture<Producer<T>>
+                                                                               
 producerCreatedFuture,
                                                                         
PartitionedTopicMetadata metadata) {
         return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, 
conf, metadata.partitions,
                 producerCreatedFuture, schema, interceptors);
@@ -394,7 +408,8 @@ public class PulsarClientImpl implements PulsarClient {
         return subscribeAsync(conf, Schema.BYTES, null);
     }
 
-    public <T> CompletableFuture<Consumer<T>> 
subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
+    public <T> CompletableFuture<Consumer<T>> 
subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema,
+                                                             
ConsumerInterceptors<T> interceptors) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
         }
@@ -406,7 +421,8 @@ public class PulsarClientImpl implements PulsarClient {
 
         for (String topic : conf.getTopicNames()) {
             if (!TopicName.isValid(topic)) {
-                return FutureUtil.failedFuture(new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
+                return FutureUtil.failedFuture(
+                        new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
             }
         }
 
@@ -442,12 +458,16 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
+    private <T> CompletableFuture<Consumer<T>> 
singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf,
+                                                                         
Schema<T> schema,
+                                                                         
ConsumerInterceptors<T> interceptors) {
         return preProcessSchemaBeforeSubscribe(this, schema, 
conf.getSingleTopic())
-            .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, 
schemaClone, interceptors));
+                .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, 
schemaClone, interceptors));
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> 
schema, ConsumerInterceptors<T> interceptors) {
+    private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf,
+                                                                           
Schema<T> schema,
+                                                                           
ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -477,7 +497,9 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerSubscribedFuture;
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
+    private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf,
+                                                                        
Schema<T> schema,
+                                                                        
ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
@@ -504,9 +526,9 @@ public class PulsarClientImpl implements PulsarClient {
         lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
             .thenAccept(topics -> {
                 if (log.isDebugEnabled()) {
-                    log.debug("Get topics under namespace {}, topics.size: 
{}", namespaceName.toString(), topics.size());
+                    log.debug("Get topics under namespace {}, topics.size: 
{}", namespaceName, topics.size());
                     topics.forEach(topicName ->
-                        log.debug("Get topics under namespace {}, topic: {}", 
namespaceName.toString(), topicName));
+                        log.debug("Get topics under namespace {}, topic: {}", 
namespaceName, topicName));
                 }
 
                 List<String> topicsList = topicsPatternFilter(topics, 
conf.getTopicsPattern());
@@ -600,7 +622,8 @@ public class PulsarClientImpl implements PulsarClient {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
-            if (metadata.partitions > 0 && 
MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(conf.getStartMessageId()))
 {
+            if (metadata.partitions > 0 &&
+                    
MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(conf.getStartMessageId()))
 {
                 readerFuture.completeExceptionally(
                         new PulsarClientException("The partitioned topic 
startMessageId is illegal"));
                 return;
@@ -613,7 +636,8 @@ public class PulsarClientImpl implements PulsarClient {
                         conf, externalExecutorProvider, 
consumerSubscribedFuture, schema);
                 consumer = ((MultiTopicsReaderImpl<T>) 
reader).getMultiTopicsConsumer();
             } else {
-                reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
externalExecutorProvider, consumerSubscribedFuture, schema);
+                reader = new ReaderImpl<>(PulsarClientImpl.this, conf, 
externalExecutorProvider,
+                        consumerSubscribedFuture, schema);
                 consumer = ((ReaderImpl<T>) reader).getConsumer();
             }
 
@@ -644,7 +668,8 @@ public class PulsarClientImpl implements PulsarClient {
             topicName = TopicName.get(topic);
         } catch (Throwable t) {
             return FutureUtil
-                    .failedFuture(new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
+                    .failedFuture(
+                            new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
         }
 
         return lookup.getSchema(topicName);
@@ -787,27 +812,29 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private void shutdownExecutors() throws PulsarClientException {
-        PulsarClientException pulsarClientException = null;
+        if (createdExecutorProviders) {
+            PulsarClientException pulsarClientException = null;
 
-        if (externalExecutorProvider != null && 
!externalExecutorProvider.isShutdown()) {
-            try {
-                externalExecutorProvider.shutdownNow();
-            } catch (Throwable t) {
-                log.warn("Failed to shutdown externalExecutorProvider", t);
-                pulsarClientException = PulsarClientException.unwrap(t);
+            if (externalExecutorProvider != null && 
!externalExecutorProvider.isShutdown()) {
+                try {
+                    externalExecutorProvider.shutdownNow();
+                } catch (Throwable t) {
+                    log.warn("Failed to shutdown externalExecutorProvider", t);
+                    pulsarClientException = PulsarClientException.unwrap(t);
+                }
             }
-        }
-        if (internalExecutorService != null && 
!internalExecutorService.isShutdown()) {
-            try {
-                internalExecutorService.shutdownNow();
-            } catch (Throwable t) {
-                log.warn("Failed to shutdown internalExecutorService", t);
-                pulsarClientException = PulsarClientException.unwrap(t);
+            if (internalExecutorProvider != null && 
!internalExecutorProvider.isShutdown()) {
+                try {
+                    internalExecutorProvider.shutdownNow();
+                } catch (Throwable t) {
+                    log.warn("Failed to shutdown internalExecutorService", t);
+                    pulsarClientException = PulsarClientException.unwrap(t);
+                }
             }
-        }
 
-        if (pulsarClientException != null) {
-            throw pulsarClientException;
+            if (pulsarClientException != null) {
+                throw pulsarClientException;
+            }
         }
     }
 
@@ -923,8 +950,8 @@ public class PulsarClientImpl implements PulsarClient {
             previousExceptions.add(e);
 
             ((ScheduledExecutorService) 
externalExecutorProvider.getExecutor()).schedule(() -> {
-                log.warn("[topic: {}] Could not get connection while 
getPartitionedTopicMetadata -- Will try again in {} ms",
-                    topicName, nextDelay);
+                log.warn("[topic: {}] Could not get connection while 
getPartitionedTopicMetadata -- "
+                        + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
                 getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future, previousExceptions);
             }, nextDelay, TimeUnit.MILLISECONDS);
@@ -1042,7 +1069,7 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public ExecutorService getInternalExecutorService() {
-        return internalExecutorService.getExecutor();
+        return internalExecutorProvider.getExecutor();
     }
     //
     // Transaction related API
@@ -1057,5 +1084,4 @@ public class PulsarClientImpl implements PulsarClient {
         }
         return new TransactionBuilderImpl(this, tcClient);
     }
-
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 3f1e667517e..386294e24b7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -48,10 +48,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.regex.Pattern;
 
+import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -217,4 +219,34 @@ public class PulsarClientImplTest {
             assertFalse(eventLoopGroup.isShutdown());
         }
     }
+
+    @Test
+    public void testInitializingWithExecutorProviders() throws 
PulsarClientException {
+        ClientConfigurationData conf = clientImpl.conf;
+        @Cleanup("shutdownNow")
+        ExecutorProvider executorProvider = new ExecutorProvider(2, 
"shared-executor");
+        @Cleanup
+        PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+        @Cleanup
+        PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Both externalExecutorProvider 
and internalExecutorProvider must be " +
+                    "specified or unspecified.")
+    public void testBothExecutorProvidersMustBeSpecified() throws 
PulsarClientException {
+        ClientConfigurationData conf = clientImpl.conf;
+        @Cleanup("shutdownNow")
+        ExecutorProvider executorProvider = new ExecutorProvider(2, 
"shared-executor");
+        @Cleanup
+        PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .build();
+    }
 }

Reply via email to