This is an automated email from the ASF dual-hosted git repository.

addisonj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f455418  Allow config of IO and acceptor threads in proxy (#14054)
f455418 is described below

commit f455418c8e1efcd3895d4dab593aadb36a682165
Author: Addison Higham <[email protected]>
AuthorDate: Sun Jan 30 13:04:26 2022 -0700

    Allow config of IO and acceptor threads in proxy (#14054)
    
    * Allow config of IO and acceptor threads in proxy
    
    Previously, the Pulasr Proxy did not allow configuration of the number
    of IO threads and acceptor threads in the proxy.
    
    These options can be very important to tune, as is tuneable in the
    broker, so this change simply matches the brokers perspective.
    
    Also, we increase the default number of IO threads to 2x number of
    processors instead of 1x, as in a single CPU config, it still makes
    sense to have 2 threads, at least for now, where some blocking
    operatings can happen (such as authn/authz plugins)
    
    * fix checkstyle
---
 .../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 14 ++++++++++++++
 .../java/org/apache/pulsar/proxy/server/ProxyService.java  | 10 +++++-----
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index db52a1b..0769b34 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -520,6 +520,20 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int httpNumThreads = Math.max(8, 2 * 
Runtime.getRuntime().availableProcessors());
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for Netty IO."
+                    + " Default is set to `2 * 
Runtime.getRuntime().availableProcessors()`"
+    )
+    private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for Netty Acceptor."
+                    + " Default is set to `1`"
+    )
+    private int numAcceptorThreads = 1;
+
     @Deprecated
     @FieldContext(
             category = CATEGORY_PLUGIN,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 271c85f..0e0d334 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -103,8 +103,6 @@ public class ProxyService implements Closeable {
 
     private final ScheduledExecutorService statsExecutor;
 
-    private static final int numThreads = 
Runtime.getRuntime().availableProcessors();
-
     static final Gauge ACTIVE_CONNECTIONS = Gauge
             .build("pulsar_proxy_active_connections", "Number of connections 
currently active in the proxy").create()
             .register();
@@ -145,8 +143,10 @@ public class ProxyService implements Closeable {
         } else {
             proxyLogLevel = 0;
         }
-        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, 
acceptorThreadFactory);
-        this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, 
workersThreadFactory);
+        this.acceptorGroup = 
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumAcceptorThreads(),
+                false, acceptorThreadFactory);
+        this.workerGroup = 
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(),
+                false, workersThreadFactory);
         this.authenticationService = authenticationService;
 
         // Initialize the message protocol handlers
@@ -276,7 +276,7 @@ public class ProxyService implements Closeable {
             EventLoopUtil.enableTriggeredMode(bootstrap);
             DefaultThreadFactory defaultThreadFactory = new 
DefaultThreadFactory("pulsar-ext-" + extensionName);
             EventLoopGroup dedicatedWorkerGroup =
-                    EventLoopUtil.newEventLoopGroup(numThreads, false, 
defaultThreadFactory);
+                    
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), false, 
defaultThreadFactory);
             extensionsWorkerGroups.add(dedicatedWorkerGroup);
             
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
             bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);

Reply via email to