This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d2ed37e6149 [improve][proxy]configure frame size dynamically in proxy 
(#17261)
d2ed37e6149 is described below

commit d2ed37e61490da85023871c5663051767e1bb9e9
Author: Qiang Huang <[email protected]>
AuthorDate: Fri Aug 26 11:54:36 2022 +0800

    [improve][proxy]configure frame size dynamically in proxy (#17261)
---
 .../java/org/apache/pulsar/proxy/server/DirectProxyHandler.java    | 3 ++-
 .../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java    | 7 +++++++
 .../org/apache/pulsar/proxy/server/ServiceChannelInitializer.java  | 4 +++-
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 2f067282115..1179b4db880 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -209,7 +209,8 @@ public class DirectProxyHandler {
                             new 
ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
                 }
                 ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(
-                    Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+                        service.getConfiguration().getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
+                        4));
                 ch.pipeline().addLast("proxyOutboundHandler",
                         (ChannelHandler) new ProxyBackendHandler(config, 
protocolVersion, remoteHost));
             }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 2555d648000..af3e55d1993 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -40,6 +40,7 @@ import 
org.apache.pulsar.common.configuration.PropertiesContext;
 import org.apache.pulsar.common.configuration.PropertyContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
 
 @Getter
@@ -132,6 +133,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int metadataStoreCacheExpirySeconds = 300;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Max size of messages.",
+            maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
+    private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
     @Deprecated
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index dced772e30e..32c50400473 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -43,6 +43,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final boolean enableTls;
     private final boolean tlsEnabledWithKeyStore;
     private final int brokerProxyReadTimeoutMs;
+    private final int maxMessageSize;
 
     private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
     private NettySSLContextAutoRefreshBuilder 
serverSSLContextAutoRefreshBuilder;
@@ -54,6 +55,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
         this.enableTls = enableTls;
         this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
         this.brokerProxyReadTimeoutMs = 
serviceConfig.getBrokerProxyReadTimeoutMs();
+        this.maxMessageSize = serviceConfig.getMaxMessageSize();
 
         if (enableTls) {
             if (tlsEnabledWithKeyStore) {
@@ -110,7 +112,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());
         }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+                this.maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 
4, 0, 4));
 
         ch.pipeline().addLast("handler", new ProxyConnection(proxyService, 
proxyService.getDnsAddressResolverGroup()));
     }

Reply via email to