This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a75a22ddd5fd80d447fc7ed50d07c4a291a057b5 Author: Lari Hotari <[email protected]> AuthorDate: Thu Jan 20 20:16:19 2022 +0200 [Broker] Use shared executors for broker and geo-replication clients (#13839) * [Broker] Use shared executors for broker clients and geo-replication clients * Remove brokerClientNumIOThreads configuration key and default to 1 * Revisit the shared timer creation - don't ever make it a daemon thread (cherry picked from commit 4924e6d54a8fa4fb1a01f48644c23c625d0407f2) --- .../org/apache/pulsar/broker/PulsarService.java | 32 +++++++++++++++++++++- .../pulsar/broker/namespace/NamespaceService.java | 3 +- .../pulsar/broker/service/BrokerService.java | 3 +- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6446da29071..9c799603df8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.lang.reflect.Constructor; @@ -234,6 +235,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final Consumer<Integer> processTerminator; protected final EventLoopGroup ioEventLoopGroup; + private final ExecutorProvider brokerClientSharedInternalExecutorProvider; + private final ExecutorProvider brokerClientSharedExternalExecutorProvider; + private final Timer brokerClientSharedTimer; private MetricsGenerator metricsGenerator; @@ -326,6 +330,18 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(), new DefaultThreadFactory("pulsar-io")); + // the internal executor is not used in the broker client or replication clients since this executor is + // used for consumers and the transaction support in the client. + // since an instance is required, a single threaded shared instance is used for all broker client instances + this.brokerClientSharedInternalExecutorProvider = + new ExecutorProvider(1, "broker-client-shared-internal-executor"); + // the external executor is not used in the broker client or replication clients since this executor is + // used for consumer listeners. + // since an instance is required, a single threaded shared instance is used for all broker client instances + this.brokerClientSharedExternalExecutorProvider = + new ExecutorProvider(1, "broker-client-shared-external-executor"); + this.brokerClientSharedTimer = + new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); } public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { @@ -518,6 +534,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { transactionExecutorProvider.shutdownNow(); } + brokerClientSharedExternalExecutorProvider.shutdownNow(); + brokerClientSharedInternalExecutorProvider.shutdownNow(); + brokerClientSharedTimer.stop(); ioEventLoopGroup.shutdownGracefully(); // add timeout handling for closing executors @@ -1344,6 +1363,17 @@ public class PulsarService implements AutoCloseable, ShutdownService { return this.offloaderScheduler; } + public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) + throws PulsarClientException { + return PulsarClientImpl.builder() + .conf(clientConf) + .eventLoopGroup(ioEventLoopGroup) + .timer(brokerClientSharedTimer) + .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) + .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) + .build(); + } + public synchronized PulsarClient getClient() throws PulsarServerException { if (this.client == null) { try { @@ -1388,7 +1418,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { } conf.setStatsIntervalSeconds(0); - this.client = new PulsarClientImpl(conf, ioEventLoopGroup); + this.client = createClientImpl(conf); } catch (Exception e) { throw new PulsarServerException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fbef655d489..933e750242e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -25,7 +25,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; -import io.netty.channel.EventLoopGroup; import io.prometheus.client.Counter; import java.net.URI; import java.net.URL; @@ -1306,7 +1305,7 @@ public class NamespaceService implements AutoCloseable { // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); - return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor()); + return pulsar.createClientImpl(conf); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1fbc19657f5..b2dd0785688 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -124,7 +124,6 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -1177,7 +1176,7 @@ public class BrokerService implements Closeable { } // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); - return new PulsarClientImpl(conf, workerGroup); + return pulsar.createClientImpl(conf); } catch (Exception e) { throw new RuntimeException(e); }
