This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d2ed37e6149 [improve][proxy]configure frame size dynamically in proxy
(#17261)
d2ed37e6149 is described below
commit d2ed37e61490da85023871c5663051767e1bb9e9
Author: Qiang Huang <[email protected]>
AuthorDate: Fri Aug 26 11:54:36 2022 +0800
[improve][proxy]configure frame size dynamically in proxy (#17261)
---
.../java/org/apache/pulsar/proxy/server/DirectProxyHandler.java | 3 ++-
.../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 7 +++++++
.../org/apache/pulsar/proxy/server/ServiceChannelInitializer.java | 4 +++-
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 2f067282115..1179b4db880 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -209,7 +209,8 @@ public class DirectProxyHandler {
new
ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
}
ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ service.getConfiguration().getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
+ 4));
ch.pipeline().addLast("proxyOutboundHandler",
(ChannelHandler) new ProxyBackendHandler(config,
protocolVersion, remoteHost));
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 2555d648000..af3e55d1993 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -40,6 +40,7 @@ import
org.apache.pulsar.common.configuration.PropertiesContext;
import org.apache.pulsar.common.configuration.PropertyContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
@Getter
@@ -132,6 +133,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int metadataStoreCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max size of messages.",
+ maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
+ private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
@Deprecated
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index dced772e30e..32c50400473 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -43,6 +43,7 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
private final boolean enableTls;
private final boolean tlsEnabledWithKeyStore;
private final int brokerProxyReadTimeoutMs;
+ private final int maxMessageSize;
private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
private NettySSLContextAutoRefreshBuilder
serverSSLContextAutoRefreshBuilder;
@@ -54,6 +55,7 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
this.enableTls = enableTls;
this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
this.brokerProxyReadTimeoutMs =
serviceConfig.getBrokerProxyReadTimeoutMs();
+ this.maxMessageSize = serviceConfig.getMaxMessageSize();
if (enableTls) {
if (tlsEnabledWithKeyStore) {
@@ -110,7 +112,7 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new
OptionalProxyProtocolDecoder());
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ this.maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0,
4, 0, 4));
ch.pipeline().addLast("handler", new ProxyConnection(proxyService,
proxyService.getDnsAddressResolverGroup()));
}