This is an automated email from the ASF dual-hosted git repository.
addisonj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f455418 Allow config of IO and acceptor threads in proxy (#14054)
f455418 is described below
commit f455418c8e1efcd3895d4dab593aadb36a682165
Author: Addison Higham <[email protected]>
AuthorDate: Sun Jan 30 13:04:26 2022 -0700
Allow config of IO and acceptor threads in proxy (#14054)
* Allow config of IO and acceptor threads in proxy
Previously, the Pulasr Proxy did not allow configuration of the number
of IO threads and acceptor threads in the proxy.
These options can be very important to tune, as is tuneable in the
broker, so this change simply matches the brokers perspective.
Also, we increase the default number of IO threads to 2x number of
processors instead of 1x, as in a single CPU config, it still makes
sense to have 2 threads, at least for now, where some blocking
operatings can happen (such as authn/authz plugins)
* fix checkstyle
---
.../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 14 ++++++++++++++
.../java/org/apache/pulsar/proxy/server/ProxyService.java | 10 +++++-----
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index db52a1b..0769b34 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -520,6 +520,20 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int httpNumThreads = Math.max(8, 2 *
Runtime.getRuntime().availableProcessors());
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of threads to use for Netty IO."
+ + " Default is set to `2 *
Runtime.getRuntime().availableProcessors()`"
+ )
+ private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of threads to use for Netty Acceptor."
+ + " Default is set to `1`"
+ )
+ private int numAcceptorThreads = 1;
+
@Deprecated
@FieldContext(
category = CATEGORY_PLUGIN,
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 271c85f..0e0d334 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -103,8 +103,6 @@ public class ProxyService implements Closeable {
private final ScheduledExecutorService statsExecutor;
- private static final int numThreads =
Runtime.getRuntime().availableProcessors();
-
static final Gauge ACTIVE_CONNECTIONS = Gauge
.build("pulsar_proxy_active_connections", "Number of connections
currently active in the proxy").create()
.register();
@@ -145,8 +143,10 @@ public class ProxyService implements Closeable {
} else {
proxyLogLevel = 0;
}
- this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false,
acceptorThreadFactory);
- this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false,
workersThreadFactory);
+ this.acceptorGroup =
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumAcceptorThreads(),
+ false, acceptorThreadFactory);
+ this.workerGroup =
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(),
+ false, workersThreadFactory);
this.authenticationService = authenticationService;
// Initialize the message protocol handlers
@@ -276,7 +276,7 @@ public class ProxyService implements Closeable {
EventLoopUtil.enableTriggeredMode(bootstrap);
DefaultThreadFactory defaultThreadFactory = new
DefaultThreadFactory("pulsar-ext-" + extensionName);
EventLoopGroup dedicatedWorkerGroup =
- EventLoopUtil.newEventLoopGroup(numThreads, false,
defaultThreadFactory);
+
EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), false,
defaultThreadFactory);
extensionsWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);