This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 06d3755  Configure limited number of threads in Jetty executor (#3207)
06d3755 is described below

commit 06d37556db75d551f8f284c9ee3635aaded232f1
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 20 11:49:39 2018 -0800

    Configure limited number of threads in Jetty executor (#3207)
    
    * Configure limited number of threads in Jetty executor
    
    * Fixed parenthesis position
    
    * Fixed usage of ExecutorThreadPool
    
    * join() -> stop()
    
    * Another fix for ExecutorThreadPool
    
    * Increased default number of threads since these are also shared with 
proxy client
---
 conf/broker.conf                                   |  3 ++
 conf/proxy.conf                                    |  8 ++--
 conf/websocket.conf                                |  5 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java | 13 +++++--
 .../pulsar/broker/web/WebExecutorThreadPool.java   | 44 ++++++++++++++++++++++
 .../org/apache/pulsar/broker/web/WebService.java   | 10 ++---
 .../pulsar/functions/worker/rest/WorkerServer.java | 26 ++++++-------
 .../pulsar/proxy/server/ProxyConfiguration.java    | 15 ++++++--
 .../org/apache/pulsar/proxy/server/WebServer.java  |  7 ++--
 .../pulsar/websocket/service/ProxyServer.java      |  7 ++--
 .../service/WebSocketProxyConfiguration.java       | 18 +++++++--
 11 files changed, 116 insertions(+), 40 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2ff87a3..d987c9b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -46,6 +46,9 @@ advertisedAddress=
 # Number of threads to use for Netty IO. Default is set to 2 * 
Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
+# Number of threads to use for HTTP requests processing. Default is set to 
Runtime.getRuntime().availableProcessors()
+numHttpServerThreads=
+
 # Flag to control features that are meant to be used when running in 
standalone mode
 isRunningStandalone=
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 3c3b027..0e4a117 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -110,7 +110,7 @@ maxConcurrentLookupRequests=50000
 
 ##### --- TLS --- #####
 
-# Deprecated - use servicePortTls and webServicePortTls instead 
+# Deprecated - use servicePortTls and webServicePortTls instead
 tlsEnabledInProxy=false
 
 # Path for the TLS certificate file
@@ -160,6 +160,10 @@ httpReverseProxyConfigs=
 # so that clients see the data as soon as possible.
 httpOutputBufferSize=32768
 
+# Number of threads to use for HTTP requests processing. Default is
+# Runtime.getRuntime().availableProcessors()
+httpNumThreads=
+
 ### --- Token Authentication Provider --- ###
 
 ## Symmetric key
@@ -181,5 +185,3 @@ tokenPublicKey=
 
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
-
-
diff --git a/conf/websocket.conf b/conf/websocket.conf
index ab03e56..6435989 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -49,6 +49,9 @@ clusterName=
 # Number of IO threads in Pulsar Client used in WebSocket proxy
 numIoThreads=8
 
+# Number of threads to use in HTTP server. Default is 
Runtime.getRuntime().availableProcessors()
+numHttpServerThreads=
+
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 connectionsPerBroker=8
 
@@ -89,7 +92,7 @@ anonymousUserRole=
 
 ### --- TLS --- ###
 
-# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead 
+# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead
 tlsEnabled=false
 
 # Accept untrusted TLS certificate from client
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 75ee5d2..cee095a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -139,6 +139,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
 
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for HTTP requests processing"
+                + " Default is set to `2 * 
Runtime.getRuntime().availableProcessors()`"
+        )
+    private int numHttpServerThreads = 2 * 
Runtime.getRuntime().availableProcessors();
+
+    @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "Enable the WebSocket API service in broker"
     )
@@ -1111,15 +1118,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     public int getBookkeeperHealthCheckIntervalSec() {
         return (int) bookkeeperClientHealthCheckIntervalSeconds;
     }
-    
+
     public Optional<Integer> getBrokerServicePort() {
         return Optional.ofNullable(brokerServicePort);
     }
-    
+
     public Optional<Integer> getBrokerServicePortTls() {
         return Optional.ofNullable(brokerServicePortTls);
     }
-    
+
     public Optional<Integer> getWebServicePort() {
         return Optional.ofNullable(webServicePort);
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
new file mode 100644
index 0000000..87437e5
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+public class WebExecutorThreadPool extends ExecutorThreadPool {
+
+    private final ThreadFactory threadFactory;
+
+    public WebExecutorThreadPool(String namePrefix) {
+        this(Runtime.getRuntime().availableProcessors(), namePrefix);
+    }
+
+    public WebExecutorThreadPool(int maxThreads, String namePrefix) {
+        super(maxThreads);
+        this.threadFactory = new DefaultThreadFactory(namePrefix);
+    }
+
+    @Override
+    protected Thread newThread(Runnable job) {
+        return threadFactory.newThread(job);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 4ca8fcf..1610779 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -51,7 +51,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.resource.Resource;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -72,13 +71,14 @@ public class WebService implements AutoCloseable {
     private final PulsarService pulsar;
     private final Server server;
     private final List<Handler> handlers;
-    private final ExecutorThreadPool webServiceExecutor;
+    private final WebExecutorThreadPool webServiceExecutor;
 
     public WebService(PulsarService pulsar) throws PulsarServerException {
         this.handlers = Lists.newArrayList();
         this.pulsar = pulsar;
-        this.webServiceExecutor = new ExecutorThreadPool();
-        this.webServiceExecutor.setName("pulsar-web");
+        this.webServiceExecutor = new WebExecutorThreadPool(
+                pulsar.getConfiguration().getNumHttpServerThreads(),
+                "pulsar-web");
         this.server = new Server(webServiceExecutor);
         List<ServerConnector> connectors = new ArrayList<>();
 
@@ -197,7 +197,7 @@ public class WebService implements AutoCloseable {
     public void close() throws PulsarServerException {
         try {
             server.stop();
-            webServiceExecutor.stop();
+            webServiceExecutor.join();
             log.info("Web service closed");
         } catch (Exception e) {
             throw new PulsarServerException(e);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0e28037..5bc2f4c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.net.BindException;
+import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.TimeZone;
 
 import javax.servlet.DispatcherType;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -35,10 +43,6 @@ import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.Slf4jRequestLog;
-
-import java.net.BindException;
-import java.net.URI;
-import java.security.GeneralSecurityException;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -47,12 +51,9 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
-import com.google.common.annotations.VisibleForTesting;
-
 @Slf4j
 public class WorkerServer {
 
@@ -60,7 +61,7 @@ public class WorkerServer {
     private final WorkerService workerService;
     private static final String MATCH_ALL = "/*";
     private static final int MAX_CONCURRENT_REQUESTS = 1024;
-    private final ExecutorThreadPool webServerExecutor;
+    private final WebExecutorThreadPool webServerExecutor;
     private Server server;
 
     private static String getErrorMessage(Server server, int port, Exception 
ex) {
@@ -75,8 +76,7 @@ public class WorkerServer {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
-        this.webServerExecutor = new ExecutorThreadPool();
-        this.webServerExecutor.setName("function-web");
+        this.webServerExecutor = new WebExecutorThreadPool("function-web");
         init();
     }
 
@@ -84,7 +84,7 @@ public class WorkerServer {
         server.start();
         log.info("Worker Server started at {}", server.getURI());
     }
-    
+
     private void init() {
         server = new Server(webServerExecutor);
 
@@ -153,7 +153,7 @@ public class WorkerServer {
 
         return contextHandler;
     }
-    
+
     @VisibleForTesting
     public void stop() {
         if (this.server != null) {
@@ -171,5 +171,5 @@ public class WorkerServer {
             }
         }
     }
-    
+
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 717019b..ee30656 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -227,7 +227,7 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
 
     /***** --- TLS --- ****/
 
-    @Deprecated 
+    @Deprecated
     private boolean tlsEnabledInProxy = false;
 
     @FieldContext(
@@ -301,6 +301,13 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int httpOutputBufferSize = 32*1024;
 
+    @FieldContext(
+           minValue = 1,
+           category = CATEGORY_HTTP,
+           doc = "Number of threads to use for HTTP requests processing"
+    )
+    private int httpNumThreads = 2 * 
Runtime.getRuntime().availableProcessors();
+
     @PropertiesContext(
         properties = {
             @PropertyContext(
@@ -338,15 +345,15 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     public Optional<Integer> getServicePortTls() {
         return Optional.ofNullable(servicePortTls);
     }
-    
+
     public Optional<Integer> getWebServicePort() {
         return Optional.ofNullable(webServicePort);
     }
-    
+
     public Optional<Integer> getWebServicePortTls() {
         return Optional.ofNullable(webServicePortTls);
     }
-    
+
     public void setProperties(Properties properties) {
         this.properties = properties;
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 20f6b56..35378c2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
@@ -56,7 +57,6 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -70,7 +70,7 @@ public class WebServer {
     private static final String MATCH_ALL = "/*";
 
     private final Server server;
-    private final ExecutorThreadPool webServiceExecutor;
+    private final WebExecutorThreadPool webServiceExecutor;
     private final AuthenticationService authenticationService;
     private final List<String> servletPaths = Lists.newArrayList();
     private final List<Handler> handlers = Lists.newArrayList();
@@ -79,8 +79,7 @@ public class WebServer {
     private URI serviceURI = null;
 
     public WebServer(ProxyConfiguration config, AuthenticationService 
authenticationService) {
-        this.webServiceExecutor = new ExecutorThreadPool();
-        this.webServiceExecutor.setName("pulsar-external-web");
+        this.webServiceExecutor = new 
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
         this.server = new Server(webServiceExecutor);
         this.authenticationService = authenticationService;
         this.config = config;
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 1470581..e2cdf43 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -32,6 +32,7 @@ import javax.websocket.DeploymentException;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
+import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.eclipse.jetty.server.Handler;
@@ -45,7 +46,6 @@ import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -55,13 +55,12 @@ public class ProxyServer {
     private final Server server;
     private final List<Handler> handlers = Lists.newArrayList();
     private final WebSocketProxyConfiguration conf;
-    private final ExecutorThreadPool executorService;
+    private final WebExecutorThreadPool executorService;
 
     public ProxyServer(WebSocketProxyConfiguration config)
             throws PulsarClientException, MalformedURLException, 
PulsarServerException {
         this.conf = config;
-        executorService = new ExecutorThreadPool();
-        executorService.setName("pulsar-websocket-web");
+        executorService = new 
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web");
         this.server = new Server(executorService);
         List<ServerConnector> connectors = new ArrayList<>();
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 7f714ae..5279ae0 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -91,6 +91,10 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
 
     // Number of IO threads in Pulsar Client used in WebSocket proxy
     private int numIoThreads = Runtime.getRuntime().availableProcessors();
+
+    // Number of threads to use in HTTP server
+    private int numHttpServerThreads = 
Runtime.getRuntime().availableProcessors();
+
     // Number of connections per Broker in Pulsar Client used in WebSocket 
proxy
     private int connectionsPerBroker = 
Runtime.getRuntime().availableProcessors();
     // Time in milliseconds that idle WebSocket session times out
@@ -100,9 +104,9 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     private String anonymousUserRole = null;
 
     /***** --- TLS --- ****/
-    @Deprecated 
+    @Deprecated
     private boolean tlsEnabled = false;
-    
+
     private boolean brokerClientTlsEnabled = false;
     // Path for the TLS certificate file
     private String tlsCertificateFilePath;
@@ -115,7 +119,7 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     // Specify whether Client certificates are required for TLS
     // Reject the Connection if the Client Certificate is not trusted.
     private boolean tlsRequireTrustedClientCertOnConnect = false;
-    
+
     private Properties properties = new Properties();
 
     public String getClusterName() {
@@ -296,6 +300,14 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
         this.numIoThreads = numIoThreads;
     }
 
+    public int getNumHttpServerThreads() {
+        return numHttpServerThreads;
+    }
+
+    public void setNumHttpServerThreads(int numHttpServerThreads) {
+        this.numHttpServerThreads = numHttpServerThreads;
+    }
+
     public int getConnectionsPerBroker() {
         return connectionsPerBroker;
     }

Reply via email to