merlimat closed pull request #3207: Configure limited number of threads in 
Jetty executor
URL: https://github.com/apache/pulsar/pull/3207
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 2ff87a3c61..d987c9be9e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -46,6 +46,9 @@ advertisedAddress=
 # Number of threads to use for Netty IO. Default is set to 2 * 
Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
+# Number of threads to use for HTTP requests processing. Default is set to 
Runtime.getRuntime().availableProcessors()
+numHttpServerThreads=
+
 # Flag to control features that are meant to be used when running in 
standalone mode
 isRunningStandalone=
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 3c3b027f04..0e4a11762a 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -110,7 +110,7 @@ maxConcurrentLookupRequests=50000
 
 ##### --- TLS --- #####
 
-# Deprecated - use servicePortTls and webServicePortTls instead 
+# Deprecated - use servicePortTls and webServicePortTls instead
 tlsEnabledInProxy=false
 
 # Path for the TLS certificate file
@@ -160,6 +160,10 @@ httpReverseProxyConfigs=
 # so that clients see the data as soon as possible.
 httpOutputBufferSize=32768
 
+# Number of threads to use for HTTP requests processing. Default is
+# Runtime.getRuntime().availableProcessors()
+httpNumThreads=
+
 ### --- Token Authentication Provider --- ###
 
 ## Symmetric key
@@ -181,5 +185,3 @@ tokenPublicKey=
 
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
-
-
diff --git a/conf/websocket.conf b/conf/websocket.conf
index ab03e5687d..6435989d05 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -49,6 +49,9 @@ clusterName=
 # Number of IO threads in Pulsar Client used in WebSocket proxy
 numIoThreads=8
 
+# Number of threads to use in HTTP server. Default is 
Runtime.getRuntime().availableProcessors()
+numHttpServerThreads=
+
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 connectionsPerBroker=8
 
@@ -89,7 +92,7 @@ anonymousUserRole=
 
 ### --- TLS --- ###
 
-# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead 
+# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead
 tlsEnabled=false
 
 # Accept untrusted TLS certificate from client
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 75ee5d22e1..cee095a087 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -138,6 +138,13 @@
     )
     private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for HTTP requests processing"
+                + " Default is set to `2 * 
Runtime.getRuntime().availableProcessors()`"
+        )
+    private int numHttpServerThreads = 2 * 
Runtime.getRuntime().availableProcessors();
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "Enable the WebSocket API service in broker"
@@ -1111,15 +1118,15 @@ public void setProperties(Properties properties) {
     public int getBookkeeperHealthCheckIntervalSec() {
         return (int) bookkeeperClientHealthCheckIntervalSeconds;
     }
-    
+
     public Optional<Integer> getBrokerServicePort() {
         return Optional.ofNullable(brokerServicePort);
     }
-    
+
     public Optional<Integer> getBrokerServicePortTls() {
         return Optional.ofNullable(brokerServicePortTls);
     }
-    
+
     public Optional<Integer> getWebServicePort() {
         return Optional.ofNullable(webServicePort);
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
new file mode 100644
index 0000000000..87437e510d
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+public class WebExecutorThreadPool extends ExecutorThreadPool {
+
+    private final ThreadFactory threadFactory;
+
+    public WebExecutorThreadPool(String namePrefix) {
+        this(Runtime.getRuntime().availableProcessors(), namePrefix);
+    }
+
+    public WebExecutorThreadPool(int maxThreads, String namePrefix) {
+        super(maxThreads);
+        this.threadFactory = new DefaultThreadFactory(namePrefix);
+    }
+
+    @Override
+    protected Thread newThread(Runnable job) {
+        return threadFactory.newThread(job);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 4ca8fcf70a..1610779025 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -51,7 +51,6 @@
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.resource.Resource;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -72,13 +71,14 @@
     private final PulsarService pulsar;
     private final Server server;
     private final List<Handler> handlers;
-    private final ExecutorThreadPool webServiceExecutor;
+    private final WebExecutorThreadPool webServiceExecutor;
 
     public WebService(PulsarService pulsar) throws PulsarServerException {
         this.handlers = Lists.newArrayList();
         this.pulsar = pulsar;
-        this.webServiceExecutor = new ExecutorThreadPool();
-        this.webServiceExecutor.setName("pulsar-web");
+        this.webServiceExecutor = new WebExecutorThreadPool(
+                pulsar.getConfiguration().getNumHttpServerThreads(),
+                "pulsar-web");
         this.server = new Server(webServiceExecutor);
         List<ServerConnector> connectors = new ArrayList<>();
 
@@ -197,7 +197,7 @@ public void start() throws PulsarServerException {
     public void close() throws PulsarServerException {
         try {
             server.stop();
-            webServiceExecutor.stop();
+            webServiceExecutor.join();
             log.info("Web service closed");
         } catch (Exception e) {
             throw new PulsarServerException(e);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0e280377e2..5bc2f4cd4d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.net.BindException;
+import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.TimeZone;
 
 import javax.servlet.DispatcherType;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -35,10 +43,6 @@
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.Slf4jRequestLog;
-
-import java.net.BindException;
-import java.net.URI;
-import java.security.GeneralSecurityException;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -47,12 +51,9 @@
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
-import com.google.common.annotations.VisibleForTesting;
-
 @Slf4j
 public class WorkerServer {
 
@@ -60,7 +61,7 @@
     private final WorkerService workerService;
     private static final String MATCH_ALL = "/*";
     private static final int MAX_CONCURRENT_REQUESTS = 1024;
-    private final ExecutorThreadPool webServerExecutor;
+    private final WebExecutorThreadPool webServerExecutor;
     private Server server;
 
     private static String getErrorMessage(Server server, int port, Exception 
ex) {
@@ -75,8 +76,7 @@ private static String getErrorMessage(Server server, int 
port, Exception ex) {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
-        this.webServerExecutor = new ExecutorThreadPool();
-        this.webServerExecutor.setName("function-web");
+        this.webServerExecutor = new WebExecutorThreadPool("function-web");
         init();
     }
 
@@ -84,7 +84,7 @@ public void start() throws Exception {
         server.start();
         log.info("Worker Server started at {}", server.getURI());
     }
-    
+
     private void init() {
         server = new Server(webServerExecutor);
 
@@ -153,7 +153,7 @@ public static ServletContextHandler 
newServletContextHandler(String contextPath,
 
         return contextHandler;
     }
-    
+
     @VisibleForTesting
     public void stop() {
         if (this.server != null) {
@@ -171,5 +171,5 @@ public void stop() {
             }
         }
     }
-    
+
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 717019b8df..ee30656e4d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -227,7 +227,7 @@
 
     /***** --- TLS --- ****/
 
-    @Deprecated 
+    @Deprecated
     private boolean tlsEnabledInProxy = false;
 
     @FieldContext(
@@ -301,6 +301,13 @@
     )
     private int httpOutputBufferSize = 32*1024;
 
+    @FieldContext(
+           minValue = 1,
+           category = CATEGORY_HTTP,
+           doc = "Number of threads to use for HTTP requests processing"
+    )
+    private int httpNumThreads = 2 * 
Runtime.getRuntime().availableProcessors();
+
     @PropertiesContext(
         properties = {
             @PropertyContext(
@@ -338,15 +345,15 @@ public Properties getProperties() {
     public Optional<Integer> getServicePortTls() {
         return Optional.ofNullable(servicePortTls);
     }
-    
+
     public Optional<Integer> getWebServicePort() {
         return Optional.ofNullable(webServicePort);
     }
-    
+
     public Optional<Integer> getWebServicePortTls() {
         return Optional.ofNullable(webServicePortTls);
     }
-    
+
     public void setProperties(Properties properties) {
         this.properties = properties;
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 20f6b56a90..35378c2294 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
@@ -56,7 +57,6 @@
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -70,7 +70,7 @@
     private static final String MATCH_ALL = "/*";
 
     private final Server server;
-    private final ExecutorThreadPool webServiceExecutor;
+    private final WebExecutorThreadPool webServiceExecutor;
     private final AuthenticationService authenticationService;
     private final List<String> servletPaths = Lists.newArrayList();
     private final List<Handler> handlers = Lists.newArrayList();
@@ -79,8 +79,7 @@
     private URI serviceURI = null;
 
     public WebServer(ProxyConfiguration config, AuthenticationService 
authenticationService) {
-        this.webServiceExecutor = new ExecutorThreadPool();
-        this.webServiceExecutor.setName("pulsar-external-web");
+        this.webServiceExecutor = new 
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
         this.server = new Server(webServiceExecutor);
         this.authenticationService = authenticationService;
         this.config = config;
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 147058170c..e2cdf43195 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -32,6 +32,7 @@
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.eclipse.jetty.server.Handler;
@@ -45,7 +46,6 @@
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -55,13 +55,12 @@
     private final Server server;
     private final List<Handler> handlers = Lists.newArrayList();
     private final WebSocketProxyConfiguration conf;
-    private final ExecutorThreadPool executorService;
+    private final WebExecutorThreadPool executorService;
 
     public ProxyServer(WebSocketProxyConfiguration config)
             throws PulsarClientException, MalformedURLException, 
PulsarServerException {
         this.conf = config;
-        executorService = new ExecutorThreadPool();
-        executorService.setName("pulsar-websocket-web");
+        executorService = new 
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web");
         this.server = new Server(executorService);
         List<ServerConnector> connectors = new ArrayList<>();
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 7f714ae7ac..5279ae09a4 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -91,6 +91,10 @@
 
     // Number of IO threads in Pulsar Client used in WebSocket proxy
     private int numIoThreads = Runtime.getRuntime().availableProcessors();
+
+    // Number of threads to use in HTTP server
+    private int numHttpServerThreads = 
Runtime.getRuntime().availableProcessors();
+
     // Number of connections per Broker in Pulsar Client used in WebSocket 
proxy
     private int connectionsPerBroker = 
Runtime.getRuntime().availableProcessors();
     // Time in milliseconds that idle WebSocket session times out
@@ -100,9 +104,9 @@
     private String anonymousUserRole = null;
 
     /***** --- TLS --- ****/
-    @Deprecated 
+    @Deprecated
     private boolean tlsEnabled = false;
-    
+
     private boolean brokerClientTlsEnabled = false;
     // Path for the TLS certificate file
     private String tlsCertificateFilePath;
@@ -115,7 +119,7 @@
     // Specify whether Client certificates are required for TLS
     // Reject the Connection if the Client Certificate is not trusted.
     private boolean tlsRequireTrustedClientCertOnConnect = false;
-    
+
     private Properties properties = new Properties();
 
     public String getClusterName() {
@@ -296,6 +300,14 @@ public void setNumIoThreads(int numIoThreads) {
         this.numIoThreads = numIoThreads;
     }
 
+    public int getNumHttpServerThreads() {
+        return numHttpServerThreads;
+    }
+
+    public void setNumHttpServerThreads(int numHttpServerThreads) {
+        this.numHttpServerThreads = numHttpServerThreads;
+    }
+
     public int getConnectionsPerBroker() {
         return connectionsPerBroker;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to