This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 144e064c145613f697933f5e959770b1ee3f81d5 Author: Zixuan Liu <[email protected]> AuthorDate: Fri Feb 6 23:02:58 2026 +0800 [improve][broker] Add idle timeout support for http (#25224) (cherry picked from commit 63220eadcfc11f43d4537327a3593603ee5be697) --- conf/broker.conf | 3 +++ conf/proxy.conf | 6 ++++++ conf/standalone.conf | 5 ++++- .../org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../java/org/apache/pulsar/broker/web/WebService.java | 6 +++++- .../org/apache/pulsar/proxy/server/AdminProxyHandler.java | 2 ++ .../apache/pulsar/proxy/server/ProxyConfiguration.java | 15 +++++++++++++++ .../java/org/apache/pulsar/proxy/server/WebServer.java | 6 +++++- 8 files changed, 46 insertions(+), 3 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 25e5bcc15cc..503a7a0e11f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1067,6 +1067,9 @@ httpServerThreadPoolQueueSize=8192 # Capacity for accept queue in the HTTP server httpServerAcceptQueueSize=8192 +# Idle timeout for HTTP server connections in milliseconds +httpServerIdleTimeout=30000 + # Maximum number of inbound http connections. (0 to disable limiting) maxHttpServerConnections=2048 diff --git a/conf/proxy.conf b/conf/proxy.conf index 10e5cdfd00d..0cfb554cf80 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -315,6 +315,9 @@ httpServerThreadPoolQueueSize=8192 # Capacity for accept queue in the HTTP server httpServerAcceptQueueSize=8192 +#Idle timeout for HTTP server connections in milliseconds +httpServerIdleTimeout=30000 + # Maximum number of inbound http connections. (0 to disable limiting) maxHttpServerConnections=2048 @@ -326,6 +329,9 @@ maxConcurrentHttpRequests=1024 # denial of service attacks. httpMaxRequestHeaderSize = 8192 +# The idle timeout value for HTTP proxy is in millisecond +httpProxyIdleTimeout=30000 + ## Configure the datasource of basic authenticate, supports the file and Base64 format. # file: # basicAuthConf=/path/my/.htpasswd diff --git a/conf/standalone.conf b/conf/standalone.conf index 091cbf5661f..f9c7ccb658f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1432,4 +1432,7 @@ topicCompactionRetainNullKey=false # If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory", # will create topic compaction service based on message eventTime. # By default compaction service is based on message publishing order. -compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory \ No newline at end of file +compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory + +# Idle timeout for HTTP server connections in milliseconds +httpServerIdleTimeout=30000 \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c70ae2bb5ee..dede4543fc3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -347,6 +347,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int httpServerAcceptQueueSize = 8192; + @FieldContext( + category = CATEGORY_HTTP, + doc = "Idle timeout for HTTP server connections in milliseconds." + ) + private int httpServerIdleTimeout = 30 * 1000; + @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. " + "(0 to disable limiting)") private int maxHttpServerConnections = 2048; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 96e8a516a6c..7fa954948a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -134,6 +134,7 @@ public class WebService implements AutoCloseable { httpConfig.addCustomizer(new ForwardedRequestCustomizer()); } httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize()); + httpConfig.setIdleTimeout(pulsar.getConfig().getHttpServerIdleTimeout()); HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig); if (port.isPresent()) { List<ConnectionFactory> connectionFactories = new ArrayList<>(); @@ -191,7 +192,10 @@ public class WebService implements AutoCloseable { } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); + connectors.forEach(c -> { + c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()); + c.setIdleTimeout(pulsar.getConfig().getHttpServerIdleTimeout()); + }); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); filterInitializer = new FilterInitializer(pulsar); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 090f2f4a384..e4a38d7a0be 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -126,6 +126,8 @@ class AdminProxyHandler extends ProxyServlet { protocolHandlers.put(new RedirectProtocolHandler(httpClient)); } + httpClient.setIdleTimeout(config.getHttpProxyIdleTimeout()); + setTimeout(config.getHttpProxyTimeout()); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 0bb8c51ede0..cee0a2ca47d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -801,6 +801,14 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int httpProxyTimeout = 5 * 60 * 1000; + @FieldContext( + minValue = 0, + category = CATEGORY_HTTP, + doc = "Http proxy idle timeout.\n\n" + + "The idle timeout value for HTTP proxy is in millisecond." + ) + private int httpProxyIdleTimeout = 30 * 1000; + @FieldContext( minValue = 1, category = CATEGORY_HTTP, @@ -825,6 +833,13 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int httpServerAcceptQueueSize = 8192; + @FieldContext( + minValue = 0, + category = CATEGORY_HTTP, + doc = "Idle timeout for HTTP server connections in milliseconds." + ) + private int httpServerIdleTimeout = 30 * 1000; + @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. " + "(0 to disable limiting)") private int maxHttpServerConnections = 2048; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 3c472135bdf..1d0d03d1dc7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -114,6 +114,7 @@ public class WebServer { } httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize()); httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize()); + httpConfig.setIdleTimeout(config.getHttpServerIdleTimeout()); HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig); if (config.getWebServicePort().isPresent()) { @@ -167,7 +168,10 @@ public class WebServer { } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); + connectors.stream().forEach(c -> { + c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()); + c.setIdleTimeout(config.getHttpServerIdleTimeout()); + }); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); filterInitializer = new FilterInitializer(config, authenticationService);
