This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec382117661 [Broker][Proxy][Function worker] Fix backpressure handling 
in Jetty web server configuration (#14353)
ec382117661 is described below

commit ec3821176612621e24dfbc4345525849a729fb06
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Apr 12 17:55:54 2022 +0300

    [Broker][Proxy][Function worker] Fix backpressure handling in Jetty web 
server configuration (#14353)
    
    * [Broker] Improve Jetty configuration to handle backpressure
    
    - Fix maxConcurrentHttpRequests by using QoSFilter to limit concurrent 
requests
      - previous solution didn't limit concurrent http requests
    - Replace previous hardcoded connection limit of 10000 http connections 
with configurable setting
      - use Jetty's built-in connection limit instead of 
PulsarServerConnector's custom solution
    - Rate limiting should happen in the beginning of the filter chain
    - Let Jetty tune selectors and acceptors based on number of cores
      - JETTY_AVAILABLE_PROCESSORS=n environment variable can be used to 
override the number of cores reported by the OS
        - This is useful when CPU limit isn't set on k8s and the number of 
cores is the number of total cores available on the k8s node
        - use can also use -XX:ActiveProcessorCount=n to make Java's 
Runtime.getRuntime().availableProcessors() return n
    - Make accept queue capacity configurable
    - Make thread pool queue capacity bounded and make it configurable
    
    * [Functions] Add http backpressure handling for Functions worker's http 
server
---
 conf/broker.conf                                   | 15 ++++-
 conf/proxy.conf                                    | 11 ++++
 conf/websocket.conf                                | 12 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 18 +++++
 .../java/org/apache/pulsar/broker/web/Filters.java | 60 +++++++++++++++++
 .../pulsar/broker/web/WebExecutorThreadPool.java   |  9 +--
 .../pulsar/broker/web/PulsarServerConnector.java   | 67 -------------------
 .../org/apache/pulsar/broker/web/WebService.java   | 76 ++++++++++++----------
 .../pulsar/functions/worker/WorkerConfig.java      | 21 ++++++
 pulsar-functions/worker/pom.xml                    |  5 ++
 .../pulsar/functions/worker/rest/WorkerServer.java | 46 ++++++++-----
 .../pulsar/proxy/server/ProxyConfiguration.java    | 21 ++++++
 .../org/apache/pulsar/proxy/server/WebServer.java  | 43 +++++++-----
 pulsar-websocket/pom.xml                           |  5 ++
 .../pulsar/websocket/service/ProxyServer.java      | 28 ++++++--
 .../service/WebSocketProxyConfiguration.java       | 17 +++++
 16 files changed, 307 insertions(+), 147 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8c5448a881d..e128632fcf7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -105,9 +105,6 @@ numCacheExecutorThreadPoolSize=10
 # reduce the number of IO threads and BK client threads to only have few CPU 
cores busy.
 enableBusyWait=false
 
-# Max concurrent web requests
-maxConcurrentHttpRequests=1024
-
 # Flag to control features that are meant to be used when running in 
standalone mode
 isRunningStandalone=
 
@@ -783,6 +780,18 @@ httpRequestsLimitEnabled=false
 # Max HTTP requests per seconds allowed. The excess of requests will be 
rejected with HTTP code 429 (Too many requests)
 httpRequestsMaxPerSecond=100.0
 
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
+
 ### --- BookKeeper Client --- ###
 
 # Metadata service uri that bookkeeper is used for loading corresponding 
metadata driver
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 19d9e69e195..470ce8155cd 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -212,6 +212,17 @@ httpRequestsLimitEnabled=false
 # Max HTTP requests per seconds allowed. The excess of requests will be 
rejected with HTTP code 429 (Too many requests)
 httpRequestsMaxPerSecond=100.0
 
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
 
 ### --- Token Authentication Provider --- ###
 
diff --git a/conf/websocket.conf b/conf/websocket.conf
index eff639838fe..c871b202021 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -67,6 +67,18 @@ webSocketSessionIdleTimeoutMillis=300000
 # The maximum size of a text message during parsing in WebSocket proxy
 webSocketMaxTextFrameSize=1048576
 
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
+
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index eb3f86ac02e..f28e086a9e9 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -296,6 +296,24 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web 
requests")
     private int maxConcurrentHttpRequests = 1024;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Capacity for thread pool queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerThreadPoolQueueSize = 8192;
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Capacity for accept queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerAcceptQueueSize = 8192;
+
+    @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound 
http connections. "
+            + "(0 to disable limiting)")
+    private int maxHttpServerConnections = 2048;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the 
delayed delivery for messages.")
     private boolean delayedDeliveryEnabled = true;
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
new file mode 100644
index 00000000000..3b6bb721bcc
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.util.EnumSet;
+import java.util.Map;
+import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+
+public class Filters {
+    private static final String MATCH_ALL = "/*";
+
+    /**
+     * Adds a filter instance to the servlet context handler.
+     * The filter will be used for all requests.
+     *
+     * @param context servlet context handler instance
+     * @param filter filter instance
+     */
+    public static void addFilter(ServletContextHandler context, Filter filter) 
{
+        addFilterHolder(context, new FilterHolder(filter));
+    }
+
+    private static void addFilterHolder(ServletContextHandler context, 
FilterHolder filter) {
+        context.addFilter(filter,
+                MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+    }
+
+    /**
+     * Adds a filter to the servlet context handler which gets instantiated 
and configured when the server starts.
+     *
+     * @param context servlet context handler instance
+     * @param filter filter class
+     * @param initParams initialization parameters used for configuring the 
filter instance
+     */
+    public static void addFilterClass(ServletContextHandler context, Class<? 
extends Filter> filter,
+                                      Map<String, String> initParams) {
+        FilterHolder holder = new FilterHolder(filter);
+        holder.setInitParameters(initParams);
+        addFilterHolder(context, holder);
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
index 2a8c9e8e15c..c3cbe7a0ab6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
@@ -20,18 +20,19 @@ package org.apache.pulsar.broker.web;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.concurrent.ThreadFactory;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 
 public class WebExecutorThreadPool extends ExecutorThreadPool {
 
     private final ThreadFactory threadFactory;
 
-    public WebExecutorThreadPool(String namePrefix) {
-        this(Runtime.getRuntime().availableProcessors(), namePrefix);
+    public WebExecutorThreadPool(int maxThreads, String namePrefix) {
+        this(maxThreads, namePrefix, 8192);
     }
 
-    public WebExecutorThreadPool(int maxThreads, String namePrefix) {
-        super(maxThreads);
+    public WebExecutorThreadPool(int maxThreads, String namePrefix, int 
queueCapacity) {
+        super(maxThreads, Math.min(8, maxThreads), new 
BlockingArrayQueue<>(queueCapacity, queueCapacity));
         this.threadFactory = new DefaultThreadFactory(namePrefix);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
deleted file mode 100644
index e868f9245c3..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.web;
-
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-public class PulsarServerConnector extends ServerConnector {
-
-    // Throttle down the accept rate to limit the number of active TCP 
connections
-    private final Semaphore semaphore = new Semaphore(10000);
-
-    /**
-     * @param server
-     * @param acceptors
-     * @param selectors
-     */
-    public PulsarServerConnector(Server server, int acceptors, int selectors) {
-        super(server, acceptors, selectors);
-    }
-
-    /**
-     * @param server
-     * @param acceptors
-     * @param selectors
-     * @param sslContextFactory
-     */
-    public PulsarServerConnector(Server server, int acceptors, int selectors, 
SslContextFactory sslContextFactory) {
-        super(server, acceptors, selectors, sslContextFactory);
-    }
-
-    @Override
-    public void accept(int acceptorID) throws IOException {
-        try {
-            semaphore.acquire();
-            super.accept(acceptorID);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    protected void onEndPointClosed(EndPoint endp) {
-        semaphore.release();
-        super.onEndPointClosed(endp);
-    }
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 7e6b1636a5c..bffd82f7b26 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -18,20 +18,22 @@
  */
 package org.apache.pulsar.broker.web;
 
+import static org.apache.pulsar.broker.web.Filters.addFilter;
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import com.google.common.collect.Lists;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.util.ArrayList;
-import java.util.EnumSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import javax.servlet.DispatcherType;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -42,9 +44,9 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
 import org.eclipse.jetty.util.resource.Resource;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
@@ -67,7 +69,6 @@ public class WebService implements AutoCloseable {
     private final Server server;
     private final List<Handler> handlers;
     private final WebExecutorThreadPool webServiceExecutor;
-    public final int maxConcurrentRequests;
 
     private final ServerConnector httpConnector;
     private final ServerConnector httpsConnector;
@@ -76,16 +77,20 @@ public class WebService implements AutoCloseable {
     public WebService(PulsarService pulsar) throws PulsarServerException {
         this.handlers = Lists.newArrayList();
         this.pulsar = pulsar;
+        ServiceConfiguration config = pulsar.getConfiguration();
         this.webServiceExecutor = new WebExecutorThreadPool(
-                pulsar.getConfiguration().getNumHttpServerThreads(),
-                "pulsar-web");
+                config.getNumHttpServerThreads(),
+                "pulsar-web",
+                config.getHttpServerThreadPoolQueueSize());
         this.server = new Server(webServiceExecutor);
-        this.maxConcurrentRequests = 
pulsar.getConfiguration().getMaxConcurrentHttpRequests();
+        if (config.getMaxHttpServerConnections() > 0) {
+            server.addBean(new 
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+        }
         List<ServerConnector> connectors = new ArrayList<>();
 
-        Optional<Integer> port = pulsar.getConfiguration().getWebServicePort();
+        Optional<Integer> port = config.getWebServicePort();
         if (port.isPresent()) {
-            httpConnector = new PulsarServerConnector(server, 1, 1);
+            httpConnector = new ServerConnector(server);
             httpConnector.setPort(port.get());
             httpConnector.setHost(pulsar.getBindAddress());
             connectors.add(httpConnector);
@@ -93,11 +98,10 @@ public class WebService implements AutoCloseable {
             httpConnector = null;
         }
 
-        Optional<Integer> tlsPort = 
pulsar.getConfiguration().getWebServicePortTls();
+        Optional<Integer> tlsPort = config.getWebServicePortTls();
         if (tlsPort.isPresent()) {
             try {
                 SslContextFactory sslCtxFactory;
-                ServiceConfiguration config = pulsar.getConfiguration();
                 if (config.isTlsEnabledWithKeyStore()) {
                     sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
                             config.getTlsProvider(),
@@ -122,7 +126,7 @@ public class WebService implements AutoCloseable {
                             config.isTlsRequireTrustedClientCertOnConnect(), 
true,
                             config.getTlsCertRefreshCheckDurationSec());
                 }
-                httpsConnector = new PulsarServerConnector(server, 1, 1, 
sslCtxFactory);
+                httpsConnector = new ServerConnector(server, sslCtxFactory);
                 httpsConnector.setPort(tlsPort.get());
                 httpsConnector.setHost(pulsar.getBindAddress());
                 connectors.add(httpsConnector);
@@ -134,7 +138,7 @@ public class WebService implements AutoCloseable {
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
-        connectors.forEach(c -> c.setAcceptQueueSize(maxConcurrentRequests / 
connectors.size()));
+        connectors.forEach(c -> 
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new 
ServerConnector[connectors.size()]));
     }
 
@@ -160,42 +164,42 @@ public class WebService implements AutoCloseable {
             });
         }
 
-        if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
-                || !pulsar.getConfig().isDisableBrokerInterceptors()) {
+        ServiceConfiguration config = pulsar.getConfig();
+
+        if (config.getMaxConcurrentHttpRequests() > 0) {
+            addFilterClass(context, QoSFilter.class, 
Collections.singletonMap("maxRequests",
+                    String.valueOf(config.getMaxConcurrentHttpRequests())));
+        }
+
+        if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
+            addFilter(context,
+                    new 
RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
+        }
+
+        if (!config.getBrokerInterceptors().isEmpty()
+                || !config.isDisableBrokerInterceptors()) {
             ExceptionHandler handler = new ExceptionHandler();
             // Enable PreInterceptFilter only when interceptors are enabled
-            context.addFilter(new FilterHolder(new 
PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
-                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
-            context.addFilter(new FilterHolder(new 
ProcessHandlerFilter(pulsar)),
-                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+            addFilter(context, new 
PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
+            addFilter(context, new ProcessHandlerFilter(pulsar));
         }
 
         if (requiresAuthentication && 
pulsar.getConfiguration().isAuthenticationEnabled()) {
-            FilterHolder filter = new FilterHolder(new AuthenticationFilter(
+            addFilter(context, new AuthenticationFilter(
                     pulsar.getBrokerService().getAuthenticationService()));
-            context.addFilter(filter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
         }
 
-        if (pulsar.getConfig().isDisableHttpDebugMethods()) {
-            FilterHolder filter = new FilterHolder(new 
DisableDebugHttpMethodFilter(pulsar.getConfig()));
-            context.addFilter(filter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
-        }
-
-        if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
-            context.addFilter(
-                    new FilterHolder(new 
RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())),
-                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+        if (config.isDisableHttpDebugMethods()) {
+            addFilter(context, new DisableDebugHttpMethodFilter(config));
         }
 
-        if (pulsar.getConfig().getHttpMaxRequestSize() > 0) {
-            context.addFilter(new FilterHolder(
+        if (config.getHttpMaxRequestSize() > 0) {
+            addFilter(context,
                     new MaxRequestSizeFilter(
-                            pulsar.getConfig().getHttpMaxRequestSize())),
-                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+                            config.getHttpMaxRequestSize()));
         }
 
-        FilterHolder responseFilter = new FilterHolder(new 
ResponseHandlerFilter(pulsar));
-        context.addFilter(responseFilter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
+        addFilter(context, new ResponseHandlerFilter(pulsar));
         handlers.add(context);
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 8a247c9cfa5..73ba9511777 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -138,6 +138,27 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
         )
     private double httpRequestsMaxPerSecond = 100.0;
 
+    @FieldContext(category = CATEGORY_WORKER, doc = "Max concurrent web 
requests")
+    private int maxConcurrentHttpRequests = 1024;
+
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Capacity for thread pool queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerThreadPoolQueueSize = 8192;
+
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Capacity for accept queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerAcceptQueueSize = 8192;
+
+    @FieldContext(category = CATEGORY_WORKER, doc = "Maximum number of inbound 
http connections. "
+            + "(0 to disable limiting)")
+    private int maxHttpServerConnections = 2048;
+
     @FieldContext(
             category = CATEGORY_WORKER,
             required = false,
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index d1eda2acc49..cef4624a91e 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -103,6 +103,11 @@
       <artifactId>jetty-servlet</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlets</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0c6821fd9e1..feec5e3d4e5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,15 +18,16 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import static org.apache.pulsar.broker.web.Filters.addFilter;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.util.ArrayList;
-import java.util.EnumSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import javax.servlet.DispatcherType;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.Filters;
 import org.apache.pulsar.broker.web.JettyRequestLogFactory;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -35,6 +36,7 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
+import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -43,9 +45,9 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -57,7 +59,6 @@ public class WorkerServer {
     private final WorkerService workerService;
     private final AuthenticationService authenticationService;
     private static final String MATCH_ALL = "/*";
-    private static final int MAX_CONCURRENT_REQUESTS = 1024;
     private final WebExecutorThreadPool webServerExecutor;
     private Server server;
 
@@ -68,7 +69,8 @@ public class WorkerServer {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
         this.authenticationService = authenticationService;
-        this.webServerExecutor = new 
WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), 
"function-web");
+        this.webServerExecutor = new 
WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), 
"function-web",
+                this.workerConfig.getHttpServerThreadPoolQueueSize());
         init();
     }
 
@@ -79,9 +81,12 @@ public class WorkerServer {
 
     private void init() {
         server = new Server(webServerExecutor);
+        if (workerConfig.getMaxHttpServerConnections() > 0) {
+            server.addBean(new 
ConnectionLimit(workerConfig.getMaxHttpServerConnections(), server));
+        }
 
         List<ServerConnector> connectors = new ArrayList<>();
-        httpConnector = new ServerConnector(server, 1, 1);
+        httpConnector = new ServerConnector(server);
         httpConnector.setPort(this.workerConfig.getWorkerPort());
         connectors.add(httpConnector);
 
@@ -126,7 +131,7 @@ public class WorkerServer {
                         
this.workerConfig.isTlsRequireTrustedClientCertOnConnect(),
                         true,
                         this.workerConfig.getTlsCertRefreshCheckDurationSec());
-                httpsConnector = new ServerConnector(server, 1, 1, 
sslCtxFactory);
+                httpsConnector = new ServerConnector(server, sslCtxFactory);
                 httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
                 connectors.add(httpsConnector);
             } catch (Exception e) {
@@ -135,7 +140,7 @@ public class WorkerServer {
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
-        connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS / 
connectors.size()));
+        connectors.forEach(c -> 
c.setAcceptQueueSize(workerConfig.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new 
ServerConnector[connectors.size()]));
     }
 
@@ -161,22 +166,29 @@ public class WorkerServer {
 
         final ServletHolder apiServlet =
                 new ServletHolder(new ServletContainer(config));
-        contextHandler.addServlet(apiServlet, "/*");
-        if (workerService.getWorkerConfig().isAuthenticationEnabled() && 
requireAuthentication) {
-            FilterHolder filter = new FilterHolder(new 
AuthenticationFilter(authenticationService));
-            contextHandler.addFilter(filter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
-        }
+        contextHandler.addServlet(apiServlet, MATCH_ALL);
+
+        addQosFilterIfNeeded(contextHandler, workerService.getWorkerConfig());
 
         if (workerService.getWorkerConfig().isHttpRequestsLimitEnabled()) {
-            contextHandler.addFilter(
-                    new FilterHolder(
-                            new 
RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond())),
-                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+            addFilter(contextHandler,
+                    new 
RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond()));
+        }
+
+        if (workerService.getWorkerConfig().isAuthenticationEnabled() && 
requireAuthentication) {
+            addFilter(contextHandler, new 
AuthenticationFilter(authenticationService));
         }
 
         return contextHandler;
     }
 
+    private static void addQosFilterIfNeeded(ServletContextHandler context, 
WorkerConfig workerConfig) {
+        if (workerConfig.getMaxConcurrentHttpRequests() > 0) {
+            Filters.addFilterClass(context, QoSFilter.class, 
Collections.singletonMap("maxRequests",
+                    
String.valueOf(workerConfig.getMaxConcurrentHttpRequests())));
+        }
+    }
+
     public void stop() {
         if (this.server != null) {
             try {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e387401dc61..e64d51c161a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -607,6 +607,27 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int httpNumThreads = Math.max(8, 2 * 
Runtime.getRuntime().availableProcessors());
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web 
requests")
+    private int maxConcurrentHttpRequests = 1024;
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Capacity for thread pool queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerThreadPoolQueueSize = 8192;
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Capacity for accept queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerAcceptQueueSize = 8192;
+
+    @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound 
http connections. "
+            + "(0 to disable limiting)")
+    private int maxHttpServerConnections = 2048;
+
     @FieldContext(
             category = CATEGORY_SERVER,
             doc = "Number of threads to use for Netty IO."
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 84b1153b386..35f3c9a709d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -18,16 +18,16 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.apache.pulsar.broker.web.Filters.addFilter;
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
-import javax.servlet.DispatcherType;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.HttpConfiguration;
@@ -48,9 +49,9 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -77,8 +78,12 @@ public class WebServer {
     private ServerConnector connectorTls;
 
     public WebServer(ProxyConfiguration config, AuthenticationService 
authenticationService) {
-        this.webServiceExecutor = new 
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
+        this.webServiceExecutor = new 
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web",
+                config.getHttpServerThreadPoolQueueSize());
         this.server = new Server(webServiceExecutor);
+        if (config.getMaxHttpServerConnections() > 0) {
+            server.addBean(new 
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+        }
         this.authenticationService = authenticationService;
         this.config = config;
 
@@ -89,7 +94,7 @@ public class WebServer {
 
         if (config.getWebServicePort().isPresent()) {
             this.externalServicePort = config.getWebServicePort().get();
-            connector = new ServerConnector(server, 1, 1, new 
HttpConnectionFactory(httpConfig));
+            connector = new ServerConnector(server, new 
HttpConnectionFactory(httpConfig));
             connector.setHost(config.getBindAddress());
             connector.setPort(externalServicePort);
             connectors.add(connector);
@@ -122,7 +127,7 @@ public class WebServer {
                             true,
                             config.getTlsCertRefreshCheckDurationSec());
                 }
-                connectorTls = new ServerConnector(server, 1, 1, 
sslCtxFactory);
+                connectorTls = new ServerConnector(server, sslCtxFactory);
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectorTls.setHost(config.getBindAddress());
                 connectors.add(connectorTls);
@@ -132,7 +137,7 @@ public class WebServer {
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
-        connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / 
connectors.size()));
+        connectors.stream().forEach(c -> 
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new 
ServerConnector[connectors.size()]));
     }
 
@@ -159,23 +164,31 @@ public class WebServer {
 
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath(basePath);
-        context.addServlet(servletHolder, "/*");
+        context.addServlet(servletHolder, MATCH_ALL);
         for (Pair<String, Object> attribute : attributes) {
             context.setAttribute(attribute.getLeft(), attribute.getRight());
         }
-        if (config.isAuthenticationEnabled() && requireAuthentication) {
-            FilterHolder filter = new FilterHolder(new 
AuthenticationFilter(authenticationService));
-            context.addFilter(filter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
-        }
+
+        addQosFilterIfNeeded(context);
 
         if (config.isHttpRequestsLimitEnabled()) {
-            context.addFilter(new FilterHolder(new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())), MATCH_ALL,
-                    EnumSet.allOf(DispatcherType.class));
+            addFilter(context, new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond()));
+        }
+
+        if (config.isAuthenticationEnabled() && requireAuthentication) {
+            addFilter(context, new 
AuthenticationFilter(authenticationService));
         }
 
         handlers.add(context);
     }
 
+    private void addQosFilterIfNeeded(ServletContextHandler context) {
+        if (config.getMaxConcurrentHttpRequests() > 0) {
+            addFilterClass(context, QoSFilter.class, 
Collections.singletonMap("maxRequests",
+                    String.valueOf(config.getMaxConcurrentHttpRequests())));
+        }
+    }
+
     public void addRestResources(String basePath, String javaPackages, String 
attribute, Object attributeValue) {
         ResourceConfig config = new ResourceConfig();
         config.packages("jersey.config.server.provider.packages", 
javaPackages);
@@ -184,7 +197,7 @@ public class WebServer {
         servletHolder.setAsyncSupported(true);
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath(basePath);
-        context.addServlet(servletHolder, "/*");
+        context.addServlet(servletHolder, MATCH_ALL);
         context.setAttribute(attribute, attributeValue);
         handlers.add(context);
     }
diff --git a/pulsar-websocket/pom.xml b/pulsar-websocket/pom.xml
index bf61e3be185..40245300844 100644
--- a/pulsar-websocket/pom.xml
+++ b/pulsar-websocket/pom.xml
@@ -117,6 +117,11 @@
       <artifactId>javax-websocket-client-impl</artifactId>
       <version>${jetty.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlets</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.hdrhistogram</groupId>
       <artifactId>HdrHistogram</artifactId>
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 6a22e984827..924f8226f56 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.websocket.service;
 
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -33,6 +35,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -42,6 +45,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -49,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ProxyServer {
+    private static final String MATCH_ALL = "/*";
     private final Server server;
     private final List<Handler> handlers = new ArrayList<>();
     private final WebSocketProxyConfiguration conf;
@@ -60,8 +65,12 @@ public class ProxyServer {
     public ProxyServer(WebSocketProxyConfiguration config)
             throws PulsarClientException, MalformedURLException, 
PulsarServerException {
         this.conf = config;
-        executorService = new 
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web");
+        executorService = new 
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web",
+                config.getHttpServerThreadPoolQueueSize());
         this.server = new Server(executorService);
+        if (config.getMaxHttpServerConnections() > 0) {
+            server.addBean(new 
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+        }
         List<ServerConnector> connectors = new ArrayList<>();
 
         if (config.getWebServicePort().isPresent()) {
@@ -80,7 +89,7 @@ public class ProxyServer {
                         config.isTlsRequireTrustedClientCertOnConnect(),
                         true,
                         config.getTlsCertRefreshCheckDurationSec());
-                connectorTls = new ServerConnector(server, -1, -1, 
sslCtxFactory);
+                connectorTls = new ServerConnector(server, sslCtxFactory);
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectors.add(connectorTls);
             } catch (Exception e) {
@@ -90,7 +99,7 @@ public class ProxyServer {
 
         // Limit number of concurrent HTTP connections to avoid getting out of
         // file descriptors
-        connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / 
connectors.size()));
+        connectors.stream().forEach(c -> 
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new 
ServerConnector[connectors.size()]));
     }
 
@@ -99,7 +108,8 @@ public class ProxyServer {
         ServletHolder servletHolder = new ServletHolder("ws-events", 
socketServlet);
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath(basePath);
-        context.addServlet(servletHolder, "/*");
+        context.addServlet(servletHolder, MATCH_ALL);
+        addQosFilterIfNeeded(context);
         handlers.add(context);
     }
 
@@ -111,11 +121,19 @@ public class ProxyServer {
         servletHolder.setAsyncSupported(true);
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath(basePath);
-        context.addServlet(servletHolder, "/*");
+        context.addServlet(servletHolder, MATCH_ALL);
         context.setAttribute(attribute, attributeValue);
+        addQosFilterIfNeeded(context);
         handlers.add(context);
     }
 
+    private void addQosFilterIfNeeded(ServletContextHandler context) {
+        if (conf.getMaxConcurrentHttpRequests() > 0) {
+            addFilterClass(context, QoSFilter.class, 
Collections.singletonMap("maxRequests",
+                    String.valueOf(conf.getMaxConcurrentHttpRequests())));
+        }
+    }
+
     public void start() throws PulsarServerException {
         log.info("Starting web socket proxy at port {}", 
Arrays.stream(server.getConnectors())
                 
.map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index e69400ef04a..0b29dd15e07 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -135,6 +135,23 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     @FieldContext(doc = "Number of threads used by Websocket service")
     private int webSocketNumServiceThreads = 20;
 
+    @FieldContext(doc = "Max concurrent web requests")
+    private int maxConcurrentHttpRequests = 1024;
+
+    @FieldContext(doc = "Capacity for thread pool queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerThreadPoolQueueSize = 8192;
+
+    @FieldContext(doc = "Capacity for accept queue in the HTTP server"
+                    + " Default is set to 8192."
+    )
+    private int httpServerAcceptQueueSize = 8192;
+
+    @FieldContext(doc = "Maximum number of inbound http connections. "
+            + "(0 to disable limiting)")
+    private int maxHttpServerConnections = 2048;
+
     @FieldContext(doc = "Number of connections per broker in Pulsar client 
used in WebSocket proxy")
     private int webSocketConnectionsPerBroker = 
Runtime.getRuntime().availableProcessors();
 

Reply via email to