This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec382117661 [Broker][Proxy][Function worker] Fix backpressure handling
in Jetty web server configuration (#14353)
ec382117661 is described below
commit ec3821176612621e24dfbc4345525849a729fb06
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Apr 12 17:55:54 2022 +0300
[Broker][Proxy][Function worker] Fix backpressure handling in Jetty web
server configuration (#14353)
* [Broker] Improve Jetty configuration to handle backpressure
- Fix maxConcurrentHttpRequests by using QoSFilter to limit concurrent
requests
- previous solution didn't limit concurrent http requests
- Replace previous hardcoded connection limit of 10000 http connections
with configurable setting
- use Jetty's built-in connection limit instead of
PulsarServerConnector's custom solution
- Rate limiting should happen in the beginning of the filter chain
- Let Jetty tune selectors and acceptors based on number of cores
- JETTY_AVAILABLE_PROCESSORS=n environment variable can be used to
override the number of cores reported by the OS
- This is useful when CPU limit isn't set on k8s and the number of
cores is the number of total cores available on the k8s node
- use can also use -XX:ActiveProcessorCount=n to make Java's
Runtime.getRuntime().availableProcessors() return n
- Make accept queue capacity configurable
- Make thread pool queue capacity bounded and make it configurable
* [Functions] Add http backpressure handling for Functions worker's http
server
---
conf/broker.conf | 15 ++++-
conf/proxy.conf | 11 ++++
conf/websocket.conf | 12 ++++
.../apache/pulsar/broker/ServiceConfiguration.java | 18 +++++
.../java/org/apache/pulsar/broker/web/Filters.java | 60 +++++++++++++++++
.../pulsar/broker/web/WebExecutorThreadPool.java | 9 +--
.../pulsar/broker/web/PulsarServerConnector.java | 67 -------------------
.../org/apache/pulsar/broker/web/WebService.java | 76 ++++++++++++----------
.../pulsar/functions/worker/WorkerConfig.java | 21 ++++++
pulsar-functions/worker/pom.xml | 5 ++
.../pulsar/functions/worker/rest/WorkerServer.java | 46 ++++++++-----
.../pulsar/proxy/server/ProxyConfiguration.java | 21 ++++++
.../org/apache/pulsar/proxy/server/WebServer.java | 43 +++++++-----
pulsar-websocket/pom.xml | 5 ++
.../pulsar/websocket/service/ProxyServer.java | 28 ++++++--
.../service/WebSocketProxyConfiguration.java | 17 +++++
16 files changed, 307 insertions(+), 147 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8c5448a881d..e128632fcf7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -105,9 +105,6 @@ numCacheExecutorThreadPoolSize=10
# reduce the number of IO threads and BK client threads to only have few CPU
cores busy.
enableBusyWait=false
-# Max concurrent web requests
-maxConcurrentHttpRequests=1024
-
# Flag to control features that are meant to be used when running in
standalone mode
isRunningStandalone=
@@ -783,6 +780,18 @@ httpRequestsLimitEnabled=false
# Max HTTP requests per seconds allowed. The excess of requests will be
rejected with HTTP code 429 (Too many requests)
httpRequestsMaxPerSecond=100.0
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
+
### --- BookKeeper Client --- ###
# Metadata service uri that bookkeeper is used for loading corresponding
metadata driver
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 19d9e69e195..470ce8155cd 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -212,6 +212,17 @@ httpRequestsLimitEnabled=false
# Max HTTP requests per seconds allowed. The excess of requests will be
rejected with HTTP code 429 (Too many requests)
httpRequestsMaxPerSecond=100.0
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
### --- Token Authentication Provider --- ###
diff --git a/conf/websocket.conf b/conf/websocket.conf
index eff639838fe..c871b202021 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -67,6 +67,18 @@ webSocketSessionIdleTimeoutMillis=300000
# The maximum size of a text message during parsing in WebSocket proxy
webSocketMaxTextFrameSize=1048576
+# Capacity for thread pool queue in the HTTP server
+httpServerThreadPoolQueueSize=8192
+
+# Capacity for accept queue in the HTTP server
+httpServerAcceptQueueSize=8192
+
+# Maximum number of inbound http connections. (0 to disable limiting)
+maxHttpServerConnections=2048
+
+# Max concurrent web requests
+maxConcurrentHttpRequests=1024
+
### --- Authentication --- ###
# Enable authentication
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index eb3f86ac02e..f28e086a9e9 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -296,6 +296,24 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web
requests")
private int maxConcurrentHttpRequests = 1024;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Capacity for thread pool queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerThreadPoolQueueSize = 8192;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Capacity for accept queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerAcceptQueueSize = 8192;
+
+ @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound
http connections. "
+ + "(0 to disable limiting)")
+ private int maxHttpServerConnections = 2048;
+
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the
delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
new file mode 100644
index 00000000000..3b6bb721bcc
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.util.EnumSet;
+import java.util.Map;
+import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+
+public class Filters {
+ private static final String MATCH_ALL = "/*";
+
+ /**
+ * Adds a filter instance to the servlet context handler.
+ * The filter will be used for all requests.
+ *
+ * @param context servlet context handler instance
+ * @param filter filter instance
+ */
+ public static void addFilter(ServletContextHandler context, Filter filter)
{
+ addFilterHolder(context, new FilterHolder(filter));
+ }
+
+ private static void addFilterHolder(ServletContextHandler context,
FilterHolder filter) {
+ context.addFilter(filter,
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ }
+
+ /**
+ * Adds a filter to the servlet context handler which gets instantiated
and configured when the server starts.
+ *
+ * @param context servlet context handler instance
+ * @param filter filter class
+ * @param initParams initialization parameters used for configuring the
filter instance
+ */
+ public static void addFilterClass(ServletContextHandler context, Class<?
extends Filter> filter,
+ Map<String, String> initParams) {
+ FilterHolder holder = new FilterHolder(filter);
+ holder.setInitParameters(initParams);
+ addFilterHolder(context, holder);
+ }
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
index 2a8c9e8e15c..c3cbe7a0ab6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java
@@ -20,18 +20,19 @@ package org.apache.pulsar.broker.web;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
public class WebExecutorThreadPool extends ExecutorThreadPool {
private final ThreadFactory threadFactory;
- public WebExecutorThreadPool(String namePrefix) {
- this(Runtime.getRuntime().availableProcessors(), namePrefix);
+ public WebExecutorThreadPool(int maxThreads, String namePrefix) {
+ this(maxThreads, namePrefix, 8192);
}
- public WebExecutorThreadPool(int maxThreads, String namePrefix) {
- super(maxThreads);
+ public WebExecutorThreadPool(int maxThreads, String namePrefix, int
queueCapacity) {
+ super(maxThreads, Math.min(8, maxThreads), new
BlockingArrayQueue<>(queueCapacity, queueCapacity));
this.threadFactory = new DefaultThreadFactory(namePrefix);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
deleted file mode 100644
index e868f9245c3..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.web;
-
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-public class PulsarServerConnector extends ServerConnector {
-
- // Throttle down the accept rate to limit the number of active TCP
connections
- private final Semaphore semaphore = new Semaphore(10000);
-
- /**
- * @param server
- * @param acceptors
- * @param selectors
- */
- public PulsarServerConnector(Server server, int acceptors, int selectors) {
- super(server, acceptors, selectors);
- }
-
- /**
- * @param server
- * @param acceptors
- * @param selectors
- * @param sslContextFactory
- */
- public PulsarServerConnector(Server server, int acceptors, int selectors,
SslContextFactory sslContextFactory) {
- super(server, acceptors, selectors, sslContextFactory);
- }
-
- @Override
- public void accept(int acceptorID) throws IOException {
- try {
- semaphore.acquire();
- super.accept(acceptorID);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- protected void onEndPointClosed(EndPoint endp) {
- semaphore.release();
- super.onEndPointClosed(endp);
- }
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 7e6b1636a5c..bffd82f7b26 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.broker.web;
+import static org.apache.pulsar.broker.web.Filters.addFilter;
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import com.google.common.collect.Lists;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
-import java.util.EnumSet;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import javax.servlet.DispatcherType;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -42,9 +44,9 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
@@ -67,7 +69,6 @@ public class WebService implements AutoCloseable {
private final Server server;
private final List<Handler> handlers;
private final WebExecutorThreadPool webServiceExecutor;
- public final int maxConcurrentRequests;
private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
@@ -76,16 +77,20 @@ public class WebService implements AutoCloseable {
public WebService(PulsarService pulsar) throws PulsarServerException {
this.handlers = Lists.newArrayList();
this.pulsar = pulsar;
+ ServiceConfiguration config = pulsar.getConfiguration();
this.webServiceExecutor = new WebExecutorThreadPool(
- pulsar.getConfiguration().getNumHttpServerThreads(),
- "pulsar-web");
+ config.getNumHttpServerThreads(),
+ "pulsar-web",
+ config.getHttpServerThreadPoolQueueSize());
this.server = new Server(webServiceExecutor);
- this.maxConcurrentRequests =
pulsar.getConfiguration().getMaxConcurrentHttpRequests();
+ if (config.getMaxHttpServerConnections() > 0) {
+ server.addBean(new
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+ }
List<ServerConnector> connectors = new ArrayList<>();
- Optional<Integer> port = pulsar.getConfiguration().getWebServicePort();
+ Optional<Integer> port = config.getWebServicePort();
if (port.isPresent()) {
- httpConnector = new PulsarServerConnector(server, 1, 1);
+ httpConnector = new ServerConnector(server);
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
@@ -93,11 +98,10 @@ public class WebService implements AutoCloseable {
httpConnector = null;
}
- Optional<Integer> tlsPort =
pulsar.getConfiguration().getWebServicePortTls();
+ Optional<Integer> tlsPort = config.getWebServicePortTls();
if (tlsPort.isPresent()) {
try {
SslContextFactory sslCtxFactory;
- ServiceConfiguration config = pulsar.getConfiguration();
if (config.isTlsEnabledWithKeyStore()) {
sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
config.getTlsProvider(),
@@ -122,7 +126,7 @@ public class WebService implements AutoCloseable {
config.isTlsRequireTrustedClientCertOnConnect(),
true,
config.getTlsCertRefreshCheckDurationSec());
}
- httpsConnector = new PulsarServerConnector(server, 1, 1,
sslCtxFactory);
+ httpsConnector = new ServerConnector(server, sslCtxFactory);
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
@@ -134,7 +138,7 @@ public class WebService implements AutoCloseable {
}
// Limit number of concurrent HTTP connections to avoid getting out of
file descriptors
- connectors.forEach(c -> c.setAcceptQueueSize(maxConcurrentRequests /
connectors.size()));
+ connectors.forEach(c ->
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new
ServerConnector[connectors.size()]));
}
@@ -160,42 +164,42 @@ public class WebService implements AutoCloseable {
});
}
- if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
- || !pulsar.getConfig().isDisableBrokerInterceptors()) {
+ ServiceConfiguration config = pulsar.getConfig();
+
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ addFilterClass(context, QoSFilter.class,
Collections.singletonMap("maxRequests",
+ String.valueOf(config.getMaxConcurrentHttpRequests())));
+ }
+
+ if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
+ addFilter(context,
+ new
RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
+ }
+
+ if (!config.getBrokerInterceptors().isEmpty()
+ || !config.isDisableBrokerInterceptors()) {
ExceptionHandler handler = new ExceptionHandler();
// Enable PreInterceptFilter only when interceptors are enabled
- context.addFilter(new FilterHolder(new
PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
- context.addFilter(new FilterHolder(new
ProcessHandlerFilter(pulsar)),
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ addFilter(context, new
PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
+ addFilter(context, new ProcessHandlerFilter(pulsar));
}
if (requiresAuthentication &&
pulsar.getConfiguration().isAuthenticationEnabled()) {
- FilterHolder filter = new FilterHolder(new AuthenticationFilter(
+ addFilter(context, new AuthenticationFilter(
pulsar.getBrokerService().getAuthenticationService()));
- context.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
}
- if (pulsar.getConfig().isDisableHttpDebugMethods()) {
- FilterHolder filter = new FilterHolder(new
DisableDebugHttpMethodFilter(pulsar.getConfig()));
- context.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
- }
-
- if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
- context.addFilter(
- new FilterHolder(new
RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())),
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ if (config.isDisableHttpDebugMethods()) {
+ addFilter(context, new DisableDebugHttpMethodFilter(config));
}
- if (pulsar.getConfig().getHttpMaxRequestSize() > 0) {
- context.addFilter(new FilterHolder(
+ if (config.getHttpMaxRequestSize() > 0) {
+ addFilter(context,
new MaxRequestSizeFilter(
- pulsar.getConfig().getHttpMaxRequestSize())),
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ config.getHttpMaxRequestSize()));
}
- FilterHolder responseFilter = new FilterHolder(new
ResponseHandlerFilter(pulsar));
- context.addFilter(responseFilter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
+ addFilter(context, new ResponseHandlerFilter(pulsar));
handlers.add(context);
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 8a247c9cfa5..73ba9511777 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -138,6 +138,27 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
)
private double httpRequestsMaxPerSecond = 100.0;
+ @FieldContext(category = CATEGORY_WORKER, doc = "Max concurrent web
requests")
+ private int maxConcurrentHttpRequests = 1024;
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Capacity for thread pool queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerThreadPoolQueueSize = 8192;
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Capacity for accept queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerAcceptQueueSize = 8192;
+
+ @FieldContext(category = CATEGORY_WORKER, doc = "Maximum number of inbound
http connections. "
+ + "(0 to disable limiting)")
+ private int maxHttpServerConnections = 2048;
+
@FieldContext(
category = CATEGORY_WORKER,
required = false,
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index d1eda2acc49..cef4624a91e 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -103,6 +103,11 @@
<artifactId>jetty-servlet</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0c6821fd9e1..feec5e3d4e5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,15 +18,16 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import static org.apache.pulsar.broker.web.Filters.addFilter;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
-import java.util.EnumSet;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import javax.servlet.DispatcherType;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.Filters;
import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -35,6 +36,7 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
+import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -43,9 +45,9 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
@@ -57,7 +59,6 @@ public class WorkerServer {
private final WorkerService workerService;
private final AuthenticationService authenticationService;
private static final String MATCH_ALL = "/*";
- private static final int MAX_CONCURRENT_REQUESTS = 1024;
private final WebExecutorThreadPool webServerExecutor;
private Server server;
@@ -68,7 +69,8 @@ public class WorkerServer {
this.workerConfig = workerService.getWorkerConfig();
this.workerService = workerService;
this.authenticationService = authenticationService;
- this.webServerExecutor = new
WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(),
"function-web");
+ this.webServerExecutor = new
WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(),
"function-web",
+ this.workerConfig.getHttpServerThreadPoolQueueSize());
init();
}
@@ -79,9 +81,12 @@ public class WorkerServer {
private void init() {
server = new Server(webServerExecutor);
+ if (workerConfig.getMaxHttpServerConnections() > 0) {
+ server.addBean(new
ConnectionLimit(workerConfig.getMaxHttpServerConnections(), server));
+ }
List<ServerConnector> connectors = new ArrayList<>();
- httpConnector = new ServerConnector(server, 1, 1);
+ httpConnector = new ServerConnector(server);
httpConnector.setPort(this.workerConfig.getWorkerPort());
connectors.add(httpConnector);
@@ -126,7 +131,7 @@ public class WorkerServer {
this.workerConfig.isTlsRequireTrustedClientCertOnConnect(),
true,
this.workerConfig.getTlsCertRefreshCheckDurationSec());
- httpsConnector = new ServerConnector(server, 1, 1,
sslCtxFactory);
+ httpsConnector = new ServerConnector(server, sslCtxFactory);
httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
connectors.add(httpsConnector);
} catch (Exception e) {
@@ -135,7 +140,7 @@ public class WorkerServer {
}
// Limit number of concurrent HTTP connections to avoid getting out of
file descriptors
- connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS /
connectors.size()));
+ connectors.forEach(c ->
c.setAcceptQueueSize(workerConfig.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new
ServerConnector[connectors.size()]));
}
@@ -161,22 +166,29 @@ public class WorkerServer {
final ServletHolder apiServlet =
new ServletHolder(new ServletContainer(config));
- contextHandler.addServlet(apiServlet, "/*");
- if (workerService.getWorkerConfig().isAuthenticationEnabled() &&
requireAuthentication) {
- FilterHolder filter = new FilterHolder(new
AuthenticationFilter(authenticationService));
- contextHandler.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
- }
+ contextHandler.addServlet(apiServlet, MATCH_ALL);
+
+ addQosFilterIfNeeded(contextHandler, workerService.getWorkerConfig());
if (workerService.getWorkerConfig().isHttpRequestsLimitEnabled()) {
- contextHandler.addFilter(
- new FilterHolder(
- new
RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond())),
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ addFilter(contextHandler,
+ new
RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond()));
+ }
+
+ if (workerService.getWorkerConfig().isAuthenticationEnabled() &&
requireAuthentication) {
+ addFilter(contextHandler, new
AuthenticationFilter(authenticationService));
}
return contextHandler;
}
+ private static void addQosFilterIfNeeded(ServletContextHandler context,
WorkerConfig workerConfig) {
+ if (workerConfig.getMaxConcurrentHttpRequests() > 0) {
+ Filters.addFilterClass(context, QoSFilter.class,
Collections.singletonMap("maxRequests",
+
String.valueOf(workerConfig.getMaxConcurrentHttpRequests())));
+ }
+ }
+
public void stop() {
if (this.server != null) {
try {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e387401dc61..e64d51c161a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -607,6 +607,27 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int httpNumThreads = Math.max(8, 2 *
Runtime.getRuntime().availableProcessors());
+ @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web
requests")
+ private int maxConcurrentHttpRequests = 1024;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Capacity for thread pool queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerThreadPoolQueueSize = 8192;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Capacity for accept queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerAcceptQueueSize = 8192;
+
+ @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound
http connections. "
+ + "(0 to disable limiting)")
+ private int maxHttpServerConnections = 2048;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty IO."
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 84b1153b386..35f3c9a709d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -18,16 +18,16 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.apache.pulsar.broker.web.Filters.addFilter;
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
-import javax.servlet.DispatcherType;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
@@ -48,9 +49,9 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
@@ -77,8 +78,12 @@ public class WebServer {
private ServerConnector connectorTls;
public WebServer(ProxyConfiguration config, AuthenticationService
authenticationService) {
- this.webServiceExecutor = new
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
+ this.webServiceExecutor = new
WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web",
+ config.getHttpServerThreadPoolQueueSize());
this.server = new Server(webServiceExecutor);
+ if (config.getMaxHttpServerConnections() > 0) {
+ server.addBean(new
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+ }
this.authenticationService = authenticationService;
this.config = config;
@@ -89,7 +94,7 @@ public class WebServer {
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
- connector = new ServerConnector(server, 1, 1, new
HttpConnectionFactory(httpConfig));
+ connector = new ServerConnector(server, new
HttpConnectionFactory(httpConfig));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
@@ -122,7 +127,7 @@ public class WebServer {
true,
config.getTlsCertRefreshCheckDurationSec());
}
- connectorTls = new ServerConnector(server, 1, 1,
sslCtxFactory);
+ connectorTls = new ServerConnector(server, sslCtxFactory);
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);
@@ -132,7 +137,7 @@ public class WebServer {
}
// Limit number of concurrent HTTP connections to avoid getting out of
file descriptors
- connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 /
connectors.size()));
+ connectors.stream().forEach(c ->
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new
ServerConnector[connectors.size()]));
}
@@ -159,23 +164,31 @@ public class WebServer {
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
- context.addServlet(servletHolder, "/*");
+ context.addServlet(servletHolder, MATCH_ALL);
for (Pair<String, Object> attribute : attributes) {
context.setAttribute(attribute.getLeft(), attribute.getRight());
}
- if (config.isAuthenticationEnabled() && requireAuthentication) {
- FilterHolder filter = new FilterHolder(new
AuthenticationFilter(authenticationService));
- context.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
- }
+
+ addQosFilterIfNeeded(context);
if (config.isHttpRequestsLimitEnabled()) {
- context.addFilter(new FilterHolder(new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())), MATCH_ALL,
- EnumSet.allOf(DispatcherType.class));
+ addFilter(context, new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond()));
+ }
+
+ if (config.isAuthenticationEnabled() && requireAuthentication) {
+ addFilter(context, new
AuthenticationFilter(authenticationService));
}
handlers.add(context);
}
+ private void addQosFilterIfNeeded(ServletContextHandler context) {
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ addFilterClass(context, QoSFilter.class,
Collections.singletonMap("maxRequests",
+ String.valueOf(config.getMaxConcurrentHttpRequests())));
+ }
+ }
+
public void addRestResources(String basePath, String javaPackages, String
attribute, Object attributeValue) {
ResourceConfig config = new ResourceConfig();
config.packages("jersey.config.server.provider.packages",
javaPackages);
@@ -184,7 +197,7 @@ public class WebServer {
servletHolder.setAsyncSupported(true);
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
- context.addServlet(servletHolder, "/*");
+ context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(attribute, attributeValue);
handlers.add(context);
}
diff --git a/pulsar-websocket/pom.xml b/pulsar-websocket/pom.xml
index bf61e3be185..40245300844 100644
--- a/pulsar-websocket/pom.xml
+++ b/pulsar-websocket/pom.xml
@@ -117,6 +117,11 @@
<artifactId>javax-websocket-client-impl</artifactId>
<version>${jetty.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 6a22e984827..924f8226f56 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.websocket.service;
+import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -33,6 +35,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -42,6 +45,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.QoSFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
@@ -49,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProxyServer {
+ private static final String MATCH_ALL = "/*";
private final Server server;
private final List<Handler> handlers = new ArrayList<>();
private final WebSocketProxyConfiguration conf;
@@ -60,8 +65,12 @@ public class ProxyServer {
public ProxyServer(WebSocketProxyConfiguration config)
throws PulsarClientException, MalformedURLException,
PulsarServerException {
this.conf = config;
- executorService = new
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web");
+ executorService = new
WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web",
+ config.getHttpServerThreadPoolQueueSize());
this.server = new Server(executorService);
+ if (config.getMaxHttpServerConnections() > 0) {
+ server.addBean(new
ConnectionLimit(config.getMaxHttpServerConnections(), server));
+ }
List<ServerConnector> connectors = new ArrayList<>();
if (config.getWebServicePort().isPresent()) {
@@ -80,7 +89,7 @@ public class ProxyServer {
config.isTlsRequireTrustedClientCertOnConnect(),
true,
config.getTlsCertRefreshCheckDurationSec());
- connectorTls = new ServerConnector(server, -1, -1,
sslCtxFactory);
+ connectorTls = new ServerConnector(server, sslCtxFactory);
connectorTls.setPort(config.getWebServicePortTls().get());
connectors.add(connectorTls);
} catch (Exception e) {
@@ -90,7 +99,7 @@ public class ProxyServer {
// Limit number of concurrent HTTP connections to avoid getting out of
// file descriptors
- connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 /
connectors.size()));
+ connectors.stream().forEach(c ->
c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new
ServerConnector[connectors.size()]));
}
@@ -99,7 +108,8 @@ public class ProxyServer {
ServletHolder servletHolder = new ServletHolder("ws-events",
socketServlet);
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
- context.addServlet(servletHolder, "/*");
+ context.addServlet(servletHolder, MATCH_ALL);
+ addQosFilterIfNeeded(context);
handlers.add(context);
}
@@ -111,11 +121,19 @@ public class ProxyServer {
servletHolder.setAsyncSupported(true);
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
- context.addServlet(servletHolder, "/*");
+ context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(attribute, attributeValue);
+ addQosFilterIfNeeded(context);
handlers.add(context);
}
+ private void addQosFilterIfNeeded(ServletContextHandler context) {
+ if (conf.getMaxConcurrentHttpRequests() > 0) {
+ addFilterClass(context, QoSFilter.class,
Collections.singletonMap("maxRequests",
+ String.valueOf(conf.getMaxConcurrentHttpRequests())));
+ }
+ }
+
public void start() throws PulsarServerException {
log.info("Starting web socket proxy at port {}",
Arrays.stream(server.getConnectors())
.map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index e69400ef04a..0b29dd15e07 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -135,6 +135,23 @@ public class WebSocketProxyConfiguration implements
PulsarConfiguration {
@FieldContext(doc = "Number of threads used by Websocket service")
private int webSocketNumServiceThreads = 20;
+ @FieldContext(doc = "Max concurrent web requests")
+ private int maxConcurrentHttpRequests = 1024;
+
+ @FieldContext(doc = "Capacity for thread pool queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerThreadPoolQueueSize = 8192;
+
+ @FieldContext(doc = "Capacity for accept queue in the HTTP server"
+ + " Default is set to 8192."
+ )
+ private int httpServerAcceptQueueSize = 8192;
+
+ @FieldContext(doc = "Maximum number of inbound http connections. "
+ + "(0 to disable limiting)")
+ private int maxHttpServerConnections = 2048;
+
@FieldContext(doc = "Number of connections per broker in Pulsar client
used in WebSocket proxy")
private int webSocketConnectionsPerBroker =
Runtime.getRuntime().availableProcessors();