This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 97fdb483e020e4a77bc3c354e5a1acb5d24ed424 Author: Lari Hotari <[email protected]> AuthorDate: Wed Jan 19 08:48:02 2022 +0200 [Client] Support passing existing executor providers to the client (#12037) In load and performance testing, there's a need to simulate production use cases and production workloads. For this purpose, it would be useful to be able to share the thread pools used by Pulsar client instances in order to be able to run a large amount of Pulsar clients in a single JVM without the overhead of a lot of threads. In the current solution, it's already possible to share the EventLoopGroup and HashedWheelTimer instances. The solution for sharing the thread pools for the external / internal executors was missing. This PR adds support for that. Example usage: ```java // shared thread pool related resources ExecutorProvider internalExecutorProvider = new ExecutorProvider(8, "shared-internal-executor"); ExecutorProvider externalExecutorProvider = new ExecutorProvider(8, "shared-external-executor"); Timer sharedTimer = new HashedWheelTimer(getThreadFactory("shared-pulsar-timer"), 1, TimeUnit.MILLISECONDS); EventLoopGroup sharedEventLoopGroup = new EpollEventLoopGroup(); // example of creating a client which uses the shared thread pools PulsarClientImpl client = PulsarClientImpl.builder().conf(conf) .internalExecutorProvider(internalExecutorProvider) .externalExecutorProvider(externalExecutorProvider) .timer(sharedTimer) .eventLoopGroup(sharedEventLoopGroup) .build(); ``` It seems that this would also improve the performance of the Pulsar Proxy since new thread pools for every client connection. That happens in the Pulsar Proxy currently: https://github.com/apache/pulsar/blob/af63e96d4aaa0ae4c4086583aa4f9b1edd72279b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L445-L451 An optimization was added in #9802 for sharing the timer, but it would be useful to also share the internal / external executors. (cherry picked from commit 4591a210a8d924d99811edfdfeb7433452e5eeb4) --- .../pulsar/client/impl/PulsarClientImpl.java | 170 ++++++++++++--------- .../pulsar/client/impl/PulsarClientImplTest.java | 32 ++++ 2 files changed, 130 insertions(+), 72 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 7c318f25a8e..3f14558a7ed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -19,18 +19,15 @@ package org.apache.pulsar.client.impl; import static org.apache.commons.lang3.StringUtils.isBlank; - import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; - import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; -import java.io.IOException; import java.net.InetSocketAddress; import java.time.Clock; import java.util.ArrayList; @@ -50,7 +47,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; - +import lombok.Builder; import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -94,13 +91,14 @@ public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); protected final ClientConfigurationData conf; + private final boolean createdExecutorProviders; private LookupService lookup; private final ConnectionPool cnxPool; @Getter private final Timer timer; private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; - private final ExecutorProvider internalExecutorService; + private final ExecutorProvider internalExecutorProvider; private final boolean createdEventLoopGroup; private final boolean createdCnxPool; @@ -115,20 +113,22 @@ public class PulsarClientImpl implements PulsarClient { private final AtomicLong producerIdGenerator = new AtomicLong(); private final AtomicLong consumerIdGenerator = new AtomicLong(); - private final AtomicLong requestIdGenerator - = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE/2)); + private final AtomicLong requestIdGenerator = + new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); protected final EventLoopGroup eventLoopGroup; private final MemoryLimitController memoryLimitController; - private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, SchemaInfoProvider>() { + private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache = + CacheBuilder.newBuilder().maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES) + .build(new CacheLoader<String, SchemaInfoProvider>() { - @Override - public SchemaInfoProvider load(String topicName) { - return newSchemaProvider(topicName); - } - }); + @Override + public SchemaInfoProvider load(String topicName) { + return newSchemaProvider(topicName); + } + }); private final Clock clientClock; @@ -136,48 +136,59 @@ public class PulsarClientImpl implements PulsarClient { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, getEventLoopGroup(conf), true); + this(conf, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true); + this(conf, eventLoopGroup, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, false, false); + this(conf, eventLoopGroup, cnxPool, null, null, null); } - public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, + Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, false, false); + this(conf, eventLoopGroup, cnxPool, timer, null, null); } - private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup) - throws PulsarClientException { - this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true); - } - - private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer, - boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException { + @Builder(builderClassName = "PulsarClientImplBuilder") + private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider) throws PulsarClientException { + EventLoopGroup eventLoopGroupReference = null; + ConnectionPool connectionPoolReference = null; try { - this.createdEventLoopGroup = createdEventLoopGroup; - this.createdCnxPool = createdCnxPool; - if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) { + this.createdEventLoopGroup = eventLoopGroup == null; + this.createdCnxPool = connectionPool == null; + if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) { + throw new IllegalArgumentException( + "Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified."); + } + this.createdExecutorProviders = externalExecutorProvider == null; + eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); + this.eventLoopGroup = eventLoopGroupReference; + if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } - this.eventLoopGroup = eventLoopGroup; setAuth(conf); this.conf = conf; clientClock = conf.getClock(); conf.getAuthentication().start(); - this.cnxPool = cnxPool; - externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); - internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + connectionPoolReference = + connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup); + this.cnxPool = connectionPoolReference; + this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : + new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); + this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : + new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); if (conf.getServiceUrl().startsWith("http")) { - lookup = new HttpLookupService(conf, eventLoopGroup); + lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { - lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor()); + lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), + conf.isUseTls(), this.externalExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -200,8 +211,8 @@ public class PulsarClientImpl implements PulsarClient { state.set(State.Open); } catch (Throwable t) { shutdown(); - shutdownEventLoopGroup(eventLoopGroup); - closeCnxPool(cnxPool); + shutdownEventLoopGroup(eventLoopGroupReference); + closeCnxPool(connectionPoolReference); throw t; } } @@ -279,11 +290,13 @@ public class PulsarClientImpl implements PulsarClient { if (schema instanceof AutoConsumeSchema) { return FutureUtil.failedFuture( - new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically")); + new PulsarClientException.InvalidConfigurationException( + "AutoConsumeSchema is only used by consumers to detect schemas automatically")); } if (state.get() != State.Open) { - return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed : state = " + state.get())); + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Client already closed : state = " + state.get())); } String topic = conf.getTopicName(); @@ -360,7 +373,8 @@ public class PulsarClientImpl implements PulsarClient { ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors, - CompletableFuture<Producer<T>> producerCreatedFuture, + CompletableFuture<Producer<T>> + producerCreatedFuture, PartitionedTopicMetadata metadata) { return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture, schema, interceptors); @@ -394,7 +408,8 @@ public class PulsarClientImpl implements PulsarClient { return subscribeAsync(conf, Schema.BYTES, null); } - public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { + public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, + ConsumerInterceptors<T> interceptors) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); } @@ -406,7 +421,8 @@ public class PulsarClientImpl implements PulsarClient { for (String topic : conf.getTopicNames()) { if (!TopicName.isValid(topic)) { - return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'")); + return FutureUtil.failedFuture( + new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'")); } } @@ -442,12 +458,16 @@ public class PulsarClientImpl implements PulsarClient { } } - private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { + private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, + Schema<T> schema, + ConsumerInterceptors<T> interceptors) { return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()) - .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors)); + .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors)); } - private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { + private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, + Schema<T> schema, + ConsumerInterceptors<T> interceptors) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); String topic = conf.getSingleTopic(); @@ -477,7 +497,9 @@ public class PulsarClientImpl implements PulsarClient { return consumerSubscribedFuture; } - private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { + private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, + Schema<T> schema, + ConsumerInterceptors<T> interceptors) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, @@ -504,9 +526,9 @@ public class PulsarClientImpl implements PulsarClient { lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode) .thenAccept(topics -> { if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); + log.debug("Get topics under namespace {}, topics.size: {}", namespaceName, topics.size()); topics.forEach(topicName -> - log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); + log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName)); } List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); @@ -600,7 +622,8 @@ public class PulsarClientImpl implements PulsarClient { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } - if (metadata.partitions > 0 && MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(conf.getStartMessageId())) { + if (metadata.partitions > 0 && + MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(conf.getStartMessageId())) { readerFuture.completeExceptionally( new PulsarClientException("The partitioned topic startMessageId is illegal")); return; @@ -613,7 +636,8 @@ public class PulsarClientImpl implements PulsarClient { conf, externalExecutorProvider, consumerSubscribedFuture, schema); consumer = ((MultiTopicsReaderImpl<T>) reader).getMultiTopicsConsumer(); } else { - reader = new ReaderImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider, consumerSubscribedFuture, schema); + reader = new ReaderImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider, + consumerSubscribedFuture, schema); consumer = ((ReaderImpl<T>) reader).getConsumer(); } @@ -644,7 +668,8 @@ public class PulsarClientImpl implements PulsarClient { topicName = TopicName.get(topic); } catch (Throwable t) { return FutureUtil - .failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'")); + .failedFuture( + new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'")); } return lookup.getSchema(topicName); @@ -787,27 +812,29 @@ public class PulsarClientImpl implements PulsarClient { } private void shutdownExecutors() throws PulsarClientException { - PulsarClientException pulsarClientException = null; + if (createdExecutorProviders) { + PulsarClientException pulsarClientException = null; - if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) { - try { - externalExecutorProvider.shutdownNow(); - } catch (Throwable t) { - log.warn("Failed to shutdown externalExecutorProvider", t); - pulsarClientException = PulsarClientException.unwrap(t); + if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) { + try { + externalExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown externalExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } } - } - if (internalExecutorService != null && !internalExecutorService.isShutdown()) { - try { - internalExecutorService.shutdownNow(); - } catch (Throwable t) { - log.warn("Failed to shutdown internalExecutorService", t); - pulsarClientException = PulsarClientException.unwrap(t); + if (internalExecutorProvider != null && !internalExecutorProvider.isShutdown()) { + try { + internalExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown internalExecutorService", t); + pulsarClientException = PulsarClientException.unwrap(t); + } } - } - if (pulsarClientException != null) { - throw pulsarClientException; + if (pulsarClientException != null) { + throw pulsarClientException; + } } } @@ -923,8 +950,8 @@ public class PulsarClientImpl implements PulsarClient { previousExceptions.add(e); ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> { - log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- Will try again in {} ms", - topicName, nextDelay); + log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions); }, nextDelay, TimeUnit.MILLISECONDS); @@ -1042,7 +1069,7 @@ public class PulsarClientImpl implements PulsarClient { } public ExecutorService getInternalExecutorService() { - return internalExecutorService.getExecutor(); + return internalExecutorProvider.getExecutor(); } // // Transaction related API @@ -1057,5 +1084,4 @@ public class PulsarClientImpl implements PulsarClient { } return new TransactionBuilderImpl(this, tcClient); } - } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 3f1e667517e..386294e24b7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -48,10 +48,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.regex.Pattern; +import lombok.Cleanup; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -217,4 +219,34 @@ public class PulsarClientImplTest { assertFalse(eventLoopGroup.isShutdown()); } } + + @Test + public void testInitializingWithExecutorProviders() throws PulsarClientException { + ClientConfigurationData conf = clientImpl.conf; + @Cleanup("shutdownNow") + ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor"); + @Cleanup + PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf) + .internalExecutorProvider(executorProvider) + .externalExecutorProvider(executorProvider) + .build(); + @Cleanup + PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf) + .internalExecutorProvider(executorProvider) + .externalExecutorProvider(executorProvider) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Both externalExecutorProvider and internalExecutorProvider must be " + + "specified or unspecified.") + public void testBothExecutorProvidersMustBeSpecified() throws PulsarClientException { + ClientConfigurationData conf = clientImpl.conf; + @Cleanup("shutdownNow") + ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor"); + @Cleanup + PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf) + .internalExecutorProvider(executorProvider) + .build(); + } }
