This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a75a22ddd5fd80d447fc7ed50d07c4a291a057b5
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 20 20:16:19 2022 +0200

    [Broker] Use shared executors for broker and geo-replication clients 
(#13839)
    
    * [Broker] Use shared executors for broker clients and geo-replication 
clients
    
    * Remove brokerClientNumIOThreads configuration key and default to 1
    
    * Revisit the shared timer creation
    
    - don't ever make it a daemon thread
    
    (cherry picked from commit 4924e6d54a8fa4fb1a01f48644c23c625d0407f2)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 32 +++++++++++++++++++++-
 .../pulsar/broker/namespace/NamespaceService.java  |  3 +-
 .../pulsar/broker/service/BrokerService.java       |  3 +-
 3 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6446da29071..9c799603df8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -234,6 +235,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     private final Consumer<Integer> processTerminator;
     protected final EventLoopGroup ioEventLoopGroup;
+    private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
+    private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
+    private final Timer brokerClientSharedTimer;
 
     private MetricsGenerator metricsGenerator;
 
@@ -326,6 +330,18 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
         this.ioEventLoopGroup = 
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), 
config.isEnableBusyWait(),
                 new DefaultThreadFactory("pulsar-io"));
+        // the internal executor is not used in the broker client or 
replication clients since this executor is
+        // used for consumers and the transaction support in the client.
+        // since an instance is required, a single threaded shared instance is 
used for all broker client instances
+        this.brokerClientSharedInternalExecutorProvider =
+                new ExecutorProvider(1, 
"broker-client-shared-internal-executor");
+        // the external executor is not used in the broker client or 
replication clients since this executor is
+        // used for consumer listeners.
+        // since an instance is required, a single threaded shared instance is 
used for all broker client instances
+        this.brokerClientSharedExternalExecutorProvider =
+                new ExecutorProvider(1, 
"broker-client-shared-external-executor");
+        this.brokerClientSharedTimer =
+                new HashedWheelTimer(new 
DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
     }
 
     public MetadataStore createConfigurationMetadataStore() throws 
MetadataStoreException {
@@ -518,6 +534,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 transactionExecutorProvider.shutdownNow();
             }
 
+            brokerClientSharedExternalExecutorProvider.shutdownNow();
+            brokerClientSharedInternalExecutorProvider.shutdownNow();
+            brokerClientSharedTimer.stop();
             ioEventLoopGroup.shutdownGracefully();
 
             // add timeout handling for closing executors
@@ -1344,6 +1363,17 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.offloaderScheduler;
     }
 
+    public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf)
+            throws PulsarClientException {
+        return PulsarClientImpl.builder()
+                .conf(clientConf)
+                .eventLoopGroup(ioEventLoopGroup)
+                .timer(brokerClientSharedTimer)
+                
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
+                
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
+                .build();
+    }
+
     public synchronized PulsarClient getClient() throws PulsarServerException {
         if (this.client == null) {
             try {
@@ -1388,7 +1418,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 }
 
                 conf.setStatsIntervalSeconds(0);
-                this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
+                this.client = createClientImpl(conf);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fbef655d489..933e750242e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -25,7 +25,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-import io.netty.channel.EventLoopGroup;
 import io.prometheus.client.Counter;
 import java.net.URI;
 import java.net.URL;
@@ -1306,7 +1305,7 @@ public class NamespaceService implements AutoCloseable {
 
                 // Share all the IO threads across broker and client 
connections
                 ClientConfigurationData conf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData();
-                return new PulsarClientImpl(conf, (EventLoopGroup) 
pulsar.getBrokerService().executor());
+                return pulsar.createClientImpl(conf);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1fbc19657f5..b2dd0785688 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -124,7 +124,6 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1177,7 +1176,7 @@ public class BrokerService implements Closeable {
                 }
                 // Share all the IO threads across broker and client 
connections
                 ClientConfigurationData conf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData();
-                return new PulsarClientImpl(conf, workerGroup);
+                return pulsar.createClientImpl(conf);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

Reply via email to