This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da1cd78  [WIP] PIP-36: Support set message size in broker.conf (#4247)
da1cd78 is described below

commit da1cd78d82c3d24b38805b7c34d3852d9ec73b63
Author: Yong Zhang <[email protected]>
AuthorDate: Thu May 16 05:21:29 2019 +0800

    [WIP] PIP-36: Support set message size in broker.conf (#4247)
    
    * Support set message size
    ---
    
    *Motivation*
    
    Currently Pulsar only support 5MB size of messages.But there are many cases 
will use more than 5MB message
    to transfer.
    https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
    
    *Modifications*
    
    - Add message size in protocol
    - Automaticlly adjust client message size by server
    
    * Use `maxMessageSize` to set `nettyFrameSize` in bookie client
    ---
    
    *Motivation*
    
    When broker specify a `maxMessageSize` bookie should accept this value as 
`nettyFrameSize`
    
    *Modifications*
    
    - Use `cnx().getMaxMessageSize`
    - Discovery service only redirect so use the constant value `5 * 1024 * 
1024` as message size
    - Put `MAX_METADATA_SIZE` as constant value in `InternalConfigurationData`
    
    * Use `Commands` to store message setting
    ---
    
    *Modifications*
    
    - use `Commands` to store default `MAX_MESSAGE_SIZE` and 
`MESSAGE_SIZE_FRAME_PADDING`
    - replace `LengthFieldBasedFrameDecoder` when has set message size
    - replace `PulsarDecoder.MaxMessageSize`
    
    * Fix some error
    
    * Fix license header
    
    * Add test and make `ClientCnx.maxMessageSize` static
    ---
    
    *Motivation*
    
    - Even if the cnx can't use, `maxMessageSize` should be used at compare 
message size. So it should as a static variable
    
    * fix code style
    
    * Fix license header
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 ++
 .../org/apache/pulsar/PulsarBrokerStarter.java     |   8 ++
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |   3 +
 .../broker/service/PulsarChannelInitializer.java   |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 .../pulsar/broker/service/MaxMessageSizeTest.java  | 154 +++++++++++++++++++++
 .../client/api/SimpleProducerConsumerTest.java     |  14 +-
 .../client/api/v1/V1_ProducerConsumerTest.java     |   5 +-
 .../pulsar/client/impl/MessageParserTest.java      |   5 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  16 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   5 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  16 +--
 .../client/impl/PulsarChannelInitializer.java      |   9 +-
 .../org/apache/pulsar/common/api/Commands.java     |  14 +-
 .../apache/pulsar/common/api/PulsarDecoder.java    |   5 -
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 ++++++++
 .../pulsar/common/api/raw/MessageParser.java       |   8 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 .../pulsar/discovery/service/ServerConnection.java |   1 +
 .../service/ServiceChannelInitializer.java         |   6 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    |  46 +++++-
 .../pulsar/proxy/server/ParserProxyHandler.java    |   9 +-
 .../proxy/server/ServiceChannelInitializer.java    |   6 +-
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  12 ++
 .../pulsar/sql/presto/PulsarRecordCursor.java      |   3 +-
 25 files changed, 369 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5428d55..5209e31 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -33,6 +33,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -489,6 +491,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " Using a value of 0, is disabling 
maxConsumersPerSubscription-limit check.")
     private int maxConsumersPerSubscription = 0;
 
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Max size of messages.",
+        maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
+    private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
     /***** --- TLS --- ****/
     @FieldContext(
         category = CATEGORY_TLS,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 3ef104f..8f291ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -45,10 +45,13 @@ import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
+import org.apache.bookkeeper.util.DirectMemoryUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.slf4j.Logger;
@@ -139,6 +142,11 @@ public class PulsarBrokerStarter {
                 brokerConfig = loadConfig(starterArguments.brokerConfigFile);
             }
 
+            int maxFrameSize = brokerConfig.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING;
+            if (maxFrameSize >= DirectMemoryUtils.maxDirectMemory()) {
+                throw new IllegalArgumentException("Max message size need 
smaller than jvm directMemory");
+            }
+
             // init functions worker
             if (starterArguments.runFunctionsWorker || 
brokerConfig.isFunctionsWorkerEnabled()) {
                 WorkerConfig workerConfig;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index f15b9a3..65207e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -28,6 +28,8 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
@@ -57,6 +59,7 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
         bkConf.setUseV2WireProtocol(conf.isBookkeeperUseV2WireProtocol());
         bkConf.setEnableDigestTypeAutodetection(true);
         bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads());
+        bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING);
 
         bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         if (conf.isBookkeeperClientHealthCheckEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index ff24caa..f3e8214 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.service;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.ByteBufPair;
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.util.NettySslContextBuilder;
 
 import io.netty.channel.ChannelInitializer;
@@ -35,6 +36,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final PulsarService pulsar;
     private final boolean enableTls;
     private final NettySslContextBuilder sslCtxRefresher;
+    private final ServiceConfiguration brokerConf;
 
     /**
      *
@@ -54,6 +56,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         } else {
             this.sslCtxRefresher = null;
         }
+        this.brokerConf = pulsar.getConfiguration();
     }
 
     @Override
@@ -65,7 +68,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+            brokerConf.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ServerCnx(pulsar));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6fe3864..b5b6fe9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -136,6 +136,7 @@ public class ServerCnx extends PulsarHandler {
     private boolean authenticateOriginalAuthData;
     private final boolean schemaValidationEnforced;
     private String authMethod = "none";
+    private final int maxMessageSize;
 
     enum State {
         Start, Connected, Failed, Connecting
@@ -156,6 +157,7 @@ public class ServerCnx extends PulsarHandler {
         this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
         this.authenticateOriginalAuthData = 
service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
         this.schemaValidationEnforced = 
pulsar.getConfiguration().isSchemaValidationEnforced();
+        this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
     }
 
     @Override
@@ -455,7 +457,7 @@ public class ServerCnx extends PulsarHandler {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion) 
{
-        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion));
+        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize));
         state = State.Connected;
         remoteEndpointProtocolVersion = clientProtoVersion;
         if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* 
ignore default version: pulsar client */) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
new file mode 100644
index 0000000..4fb3183
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MaxMessageSizeTest {
+    private static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
+    PulsarService pulsar;
+    ServiceConfiguration configuration;
+
+    PulsarAdmin admin;
+
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int BROKER_WEBSERVER_PORT = PortManager.nextFreePort();
+
+    @BeforeMethod
+    void setup() {
+        try {
+            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager::nextFreePort);
+            ServerConfiguration conf = new ServerConfiguration();
+            conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024);
+            bkEnsemble.startStandalone(conf, false);
+
+            configuration = new ServiceConfiguration();
+            configuration.setZookeeperServers("127.0.0.1:" + ZOOKEEPER_PORT);
+            configuration.setAdvertisedAddress("localhost");
+            configuration.setWebServicePort(BROKER_WEBSERVER_PORT);
+            configuration.setClusterName("max_message_test");
+            configuration.setBrokerServicePort(BROKER_SERVICE_PORT);
+            configuration.setAuthorizationEnabled(false);
+            configuration.setAuthenticationEnabled(false);
+            configuration.setManagedLedgerMaxEntriesPerLedger(5);
+            configuration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+            configuration.setMaxMessageSize(10 * 1024 * 1024);
+
+            pulsar = new PulsarService(configuration);
+            pulsar.start();
+
+            String url = "http://127.0.0.1:"; + BROKER_WEBSERVER_PORT;
+            admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
+            admin.clusters().createCluster("max_message_test", new 
ClusterData(url));
+            admin.tenants()
+                 .createTenant("test", new 
TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
+            admin.namespaces().createNamespace("test/message", 
Sets.newHashSet("max_message_test"));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @AfterMethod
+    void shutdown() {
+        try {
+            pulsar.close();
+            bkEnsemble.stop();
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testMaxMessageSetting() throws PulsarClientException {
+
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" + 
BROKER_SERVICE_PORT).build();
+        String topicName = "persistent://test/message/topic1";
+        Producer producer = 
client.newProducer().topic(topicName).sendTimeout(60, 
TimeUnit.SECONDS).create();
+        Consumer consumer = 
client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
+
+        // less than 5MB message
+
+        byte[] normalMsg = new byte[2 * 1024 * 1024];
+
+        try {
+            producer.send(normalMsg);
+        } catch (PulsarClientException e) {
+            Assert.fail("Shouldn't have exception at here", e);
+        }
+
+        byte[] consumerNormalMsg = consumer.receive().getData();
+        Assert.assertEquals(normalMsg, consumerNormalMsg);
+
+        // equal 5MB message
+        byte[] limitMsg = new byte[5 * 1024 * 1024];
+        try {
+            producer.send(limitMsg);
+        } catch (PulsarClientException e) {
+            Assert.fail("Shouldn't have exception at here", e);
+        }
+
+        byte[] consumerLimitMsg = consumer.receive().getData();
+        Assert.assertEquals(limitMsg, consumerLimitMsg);
+
+        // less than 10MB message
+        byte[] newNormalMsg = new byte[8 * 1024 * 1024];
+        try {
+            producer.send(newNormalMsg);
+        } catch (PulsarClientException e) {
+            Assert.fail("Shouldn't have exception at here", e);
+        }
+
+        byte[] consumerNewNormalMsg = consumer.receive().getData();
+        Assert.assertEquals(newNormalMsg, consumerNewNormalMsg);
+
+        // equals 10MB message
+        byte[] newLimitMsg = new byte[10 * 1024 * 1024];
+        try {
+            producer.send(newLimitMsg);
+            Assert.fail("Shouldn't send out this message");
+        } catch (PulsarClientException e) {
+            // no-op
+        }
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer.close();
+        client.close();
+
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7192e7f..32c6fa8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -634,10 +634,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
 
         // Messages are allowed up to MaxMessageSize
-        producer.newMessage().value(new byte[PulsarDecoder.MaxMessageSize]);
+        producer.newMessage().value(new 
byte[Commands.DEFAULT_MAX_MESSAGE_SIZE]);
 
         try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+            producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
             fail("Should have thrown exception");
         } catch (PulsarClientException.InvalidMessageException e) {
             // OK
@@ -671,7 +671,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .compressionType(CompressionType.LZ4)
             .create();
-        producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+        producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
         producer.close();
 
         // (b) batch-msg with compression
@@ -680,7 +680,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .compressionType(CompressionType.LZ4)
             .create();
-        producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+        producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
         producer.close();
 
         // (c) non-batch msg without compression
@@ -690,7 +690,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             .compressionType(CompressionType.NONE)
             .create();
         try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+            producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
             fail("Should have thrown exception");
         } catch (PulsarClientException.InvalidMessageException e) {
             // OK
@@ -704,7 +704,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .compressionType(CompressionType.LZ4).create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
-        byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
+        byte[] content = new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 10];
         producer.send(content);
         assertEquals(consumer.receive().getData(), content);
         producer.close();
@@ -716,7 +716,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             .compressionType(CompressionType.NONE)
             .create();
         try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+            producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
             fail("Should have thrown exception");
         } catch (PulsarClientException.InvalidMessageException e) {
             // OK
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index fecef79..2040979 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -607,10 +608,10 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
 
         // Messages are allowed up to MaxMessageSize
-        producer.newMessage().value(new byte[PulsarDecoder.MaxMessageSize]);
+        producer.newMessage().value(new 
byte[Commands.DEFAULT_MAX_MESSAGE_SIZE]);
 
         try {
-            producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+            producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
             fail("Should have thrown exception");
         } catch (PulsarClientException.InvalidMessageException e) {
             // OK
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 5ec6332..1e0c45c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -34,6 +34,7 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.TopicName;
@@ -91,7 +92,7 @@ public class MessageParserTest extends 
MockedPulsarServiceBaseTest {
                 MessageParser.parseMessage(topicName, entry.getLedgerId(), 
entry.getEntryId(), entry.getDataBuffer(),
                         (message) -> {
                             messages.add(message);
-                        });
+                        }, Commands.DEFAULT_MAX_MESSAGE_SIZE);
             } finally {
                 entry.release();
             }
@@ -133,7 +134,7 @@ public class MessageParserTest extends 
MockedPulsarServiceBaseTest {
             MessageParser.parseMessage(topicName, entry.getLedgerId(), 
entry.getEntryId(), entry.getDataBuffer(),
                     (message) -> {
                         messages.add(message);
-                    });
+                    }, Commands.DEFAULT_MAX_MESSAGE_SIZE);
         } finally {
             entry.release();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 1a88afd..7a58357 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -30,6 +30,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
 
@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import javax.net.ssl.SSLSession;
 
+import lombok.Getter;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.PulsarVersion;
@@ -117,6 +119,10 @@ public class ClientCnx extends PulsarHandler {
             .newUpdater(ClientCnx.class, "numberOfRejectRequests");
     @SuppressWarnings("unused")
     private volatile int numberOfRejectRequests = 0;
+
+    @Getter
+    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
     private final int protocolVersion;
@@ -275,7 +281,15 @@ public class ClientCnx extends PulsarHandler {
         }
 
         checkArgument(state == State.SentConnectFrame || state == 
State.Connecting);
-
+        if (connected.hasMaxMessageSize()) {
+            if (log.isDebugEnabled()) {
+                log.debug("{} Connection has max message size setting, replace 
old frameDecoder with "
+                          + "server frame size {}", ctx.channel(), 
connected.getMaxMessageSize());
+            }
+            maxMessageSize = connected.getMaxMessageSize();
+            ctx.pipeline().replace("frameDecoder", "newFrameDecoder", new 
LengthFieldBasedFrameDecoder(
+                connected.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+        }
         if (log.isDebugEnabled()) {
             log.debug("{} Connection is ready", ctx.channel());
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8909989..b2421f2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -66,7 +66,6 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
-import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
@@ -1141,10 +1140,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
         int uncompressedSize = msgMetadata.getUncompressedSize();
         int payloadSize = payload.readableBytes();
-        if (payloadSize > PulsarDecoder.MaxMessageSize) {
+        if (payloadSize > ClientCnx.getMaxMessageSize()) {
             // payload size is itself corrupted since it cannot be bigger than 
the MaxMessageSize
             log.error("[{}][{}] Got corrupted payload message size {} at {}", 
topic, subscription, payloadSize,
-                    messageId);
+                      messageId);
             discardCorruptedMessage(messageId, currentCnx, 
ValidationError.UncompressedSizeCorruption);
             return null;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b4a0993..3e32fb9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -61,7 +61,6 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
-import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -312,15 +311,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
             // validate msg-size (For batching this will be check at the batch 
completion size)
             int compressedSize = compressedPayload.readableBytes();
-
-            if (compressedSize > PulsarDecoder.MaxMessageSize) {
+            if (compressedSize > ClientCnx.getMaxMessageSize()) {
                 compressedPayload.release();
                 String compressedStr = (!isBatchMessagingEnabled() && 
conf.getCompressionType() != CompressionType.NONE)
-                        ? "Compressed"
-                        : "";
+                                           ? "Compressed"
+                                           : "";
                 PulsarClientException.InvalidMessageException 
invalidMessageException = new PulsarClientException.InvalidMessageException(
-                        format("%s Message payload size %d cannot exceed %d 
bytes", compressedStr, compressedSize,
-                                PulsarDecoder.MaxMessageSize));
+                    format("%s Message payload size %d cannot exceed %d 
bytes", compressedStr, compressedSize,
+                           ClientCnx.getMaxMessageSize()));
                 callback.sendComplete(invalidMessageException);
                 return;
             }
@@ -1306,12 +1304,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 op = OpSendMsg.create(batchMessageContainer.messages, cmd, 
sequenceId,
                         batchMessageContainer.firstCallback);
 
-                if (encryptedPayload.readableBytes() > 
PulsarDecoder.MaxMessageSize) {
+                if (encryptedPayload.readableBytes() > 
ClientCnx.getMaxMessageSize()) {
                     cmd.release();
                     semaphore.release(numMessagesInBatch);
                     if (op != null) {
                         op.callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
-                                "Message size is bigger than " + 
PulsarDecoder.MaxMessageSize + " bytes"));
+                            "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
                     }
                     return;
                 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 494cb12..5d0f872 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -29,6 +29,7 @@ import io.netty.handler.ssl.SslContext;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.util.SecurityUtility;
 
@@ -38,6 +39,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
     private final Supplier<ClientCnx> clientCnxSupplier;
     private final SslContext sslCtx;
+    private final ClientConfigurationData conf;
 
     public PulsarChannelInitializer(ClientConfigurationData conf, 
Supplier<ClientCnx> clientCnxSupplier)
             throws Exception {
@@ -57,6 +59,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         } else {
             this.sslCtx = null;
         }
+        this.conf = conf;
     }
 
     @Override
@@ -68,7 +71,11 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+        ch.pipeline()
+          .addLast("frameDecoder",
+                   new LengthFieldBasedFrameDecoder(
+                       Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
+                       0, 4, 0, 4));
         ch.pipeline().addLast("handler", clientCnxSupplier.get());
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 3692e84..04018f6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -94,6 +94,11 @@ import 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 
 public class Commands {
 
+    // default message size for transfer
+    public static final int DEFAULT_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
+    public static final int MESSAGE_SIZE_FRAME_PADDING = 10 * 1024;
+    public static final int INVALID_MAX_MESSAGE_SIZE = -1;
+
     public static final short magicCrc32c = 0x0e01;
     private static final int checksumSize = 4;
 
@@ -189,9 +194,16 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newConnected(int clientProtocolVersion) {
+    public static ByteBuf newConnected(int clientProtocoVersion) {
+        return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE);
+    }
+
+    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize) {
         CommandConnected.Builder connectedBuilder = 
CommandConnected.newBuilder();
         connectedBuilder.setServerVersion("Pulsar Server");
+        if (INVALID_MAX_MESSAGE_SIZE != maxMessageSize) {
+            connectedBuilder.setMaxMessageSize(maxMessageSize);
+        }
 
         // If the broker supports a newer version of the protocol, it will 
anyway advertise the max version that the
         // client supports, to avoid confusing the client.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 0e1ea73..38ff90f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -66,11 +66,6 @@ import org.slf4j.LoggerFactory;
 
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
-    // Max message size is limited by max BookKeeper entry size which is 5MB, 
and we need to account
-    // for headers as well.
-    public final static int MaxMessageSize = (5 * 1024 * 1024 - (10 * 1024));
-    public final static int MaxFrameSize = 5 * 1024 * 1024;
-
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         // Get a buffer that contains the full frame
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 053ad19..319302d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6533,6 +6533,10 @@ public final class PulsarApi {
     // optional int32 protocol_version = 2 [default = 0];
     boolean hasProtocolVersion();
     int getProtocolVersion();
+    
+    // optional int32 max_message_size = 3 [default = 5242880];
+    boolean hasMaxMessageSize();
+    int getMaxMessageSize();
   }
   public static final class CommandConnected extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -6611,9 +6615,20 @@ public final class PulsarApi {
       return protocolVersion_;
     }
     
+    // optional int32 max_message_size = 3 [default = 5242880];
+    public static final int MAX_MESSAGE_SIZE_FIELD_NUMBER = 3;
+    private int maxMessageSize_;
+    public boolean hasMaxMessageSize() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getMaxMessageSize() {
+      return maxMessageSize_;
+    }
+    
     private void initFields() {
       serverVersion_ = "";
       protocolVersion_ = 0;
+      maxMessageSize_ = 5242880;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6642,6 +6657,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeInt32(2, protocolVersion_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, maxMessageSize_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6658,6 +6676,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeInt32Size(2, protocolVersion_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(3, maxMessageSize_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -6775,6 +6797,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000001);
         protocolVersion_ = 0;
         bitField0_ = (bitField0_ & ~0x00000002);
+        maxMessageSize_ = 5242880;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -6816,6 +6840,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000002;
         }
         result.protocolVersion_ = protocolVersion_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.maxMessageSize_ = maxMessageSize_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -6828,6 +6856,9 @@ public final class PulsarApi {
         if (other.hasProtocolVersion()) {
           setProtocolVersion(other.getProtocolVersion());
         }
+        if (other.hasMaxMessageSize()) {
+          setMaxMessageSize(other.getMaxMessageSize());
+        }
         return this;
       }
       
@@ -6871,6 +6902,11 @@ public final class PulsarApi {
               protocolVersion_ = input.readInt32();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              maxMessageSize_ = input.readInt32();
+              break;
+            }
           }
         }
       }
@@ -6934,6 +6970,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional int32 max_message_size = 3 [default = 5242880];
+      private int maxMessageSize_ = 5242880;
+      public boolean hasMaxMessageSize() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getMaxMessageSize() {
+        return maxMessageSize_;
+      }
+      public Builder setMaxMessageSize(int value) {
+        bitField0_ |= 0x00000004;
+        maxMessageSize_ = value;
+        
+        return this;
+      }
+      public Builder clearMaxMessageSize() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        maxMessageSize_ = 5242880;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandConnected)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 1f1d66e..da24a6c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -50,7 +50,7 @@ public class MessageParser {
      * provided {@link MessageProcessor} will be invoked for each individual 
message.
      */
     public static void parseMessage(TopicName topicName, long ledgerId, long 
entryId, ByteBuf headersAndPayload,
-            MessageProcessor processor) throws IOException {
+            MessageProcessor processor, int maxMessageSize) throws IOException 
{
         MessageMetadata msgMetadata = null;
         ByteBuf payload = headersAndPayload;
         ByteBuf uncompressedPayload = null;
@@ -74,7 +74,7 @@ public class MessageParser {
             }
 
             uncompressedPayload = uncompressPayloadIfNeeded(topicName, 
msgMetadata, headersAndPayload, ledgerId,
-                    entryId);
+                    entryId, maxMessageSize);
 
             if (uncompressedPayload == null) {
                 // Message was discarded on decompression error
@@ -115,11 +115,11 @@ public class MessageParser {
     }
 
     public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, 
MessageMetadata msgMetadata,
-            ByteBuf payload, long ledgerId, long entryId) {
+            ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
         int uncompressedSize = msgMetadata.getUncompressedSize();
         int payloadSize = payload.readableBytes();
-        if (payloadSize > PulsarDecoder.MaxMessageSize) {
+        if (payloadSize > maxMessageSize) {
             // payload size is itself corrupted since it cannot be bigger than 
the MaxMessageSize
             log.error("[{}] Got corrupted payload message size {} at {}:{}", 
topic, payloadSize,
                     ledgerId, entryId);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index d73e601..11d69be 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -213,6 +213,7 @@ message CommandConnect {
 message CommandConnected {
        required string server_version = 1;
        optional int32 protocol_version = 2 [default = 0];
+       optional int32 max_message_size = 3;
 }
 
 message CommandAuthResponse {
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 0f33462..b89a028 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -36,6 +36,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index 212a740..50d698d 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pulsar.discovery.service;
 
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.util.NettySslContextBuilder;
-import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
 
 import io.netty.channel.ChannelInitializer;
@@ -63,7 +62,8 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
                 ch.pipeline().addLast(TLS_HANDLER, 
sslContext.newHandler(ch.alloc()));
             }
         }
-        ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+            Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new 
ServerConnection(discoveryService));
     }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ffa4c2c..a8deb44 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,6 +38,7 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,8 +98,8 @@ public class DirectProxyHandler {
                 if (sslCtx != null) {
                     ch.pipeline().addLast(TLS_HANDLER, 
sslCtx.newHandler(ch.alloc()));
                 }
-                ch.pipeline().addLast("frameDecoder",
-                        new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+                ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(
+                    Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
                 ch.pipeline().addLast("proxyOutboundHandler", new 
ProxyBackendHandler(config, protocolVersion));
             }
         });
@@ -259,7 +259,15 @@ public class DirectProxyHandler {
 
             state = BackendState.HandshakeCompleted;
 
-            
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future
 -> {
+            ChannelFuture channelFuture;
+            if (connected.hasMaxMessageSize()) {
+                channelFuture = inboundChannel.writeAndFlush(
+                    Commands.newConnected(connected.getProtocolVersion(), 
connected.getMaxMessageSize()));
+            } else {
+                channelFuture = 
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
+            }
+
+            channelFuture.addListener(future -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Removing decoder from pipeline", 
inboundChannel, outboundChannel);
                 }
@@ -270,8 +278,34 @@ public class DirectProxyHandler {
                 } else {
                     // Enable parsing feature, proxyLogLevel(1 or 2)
                     // Add parser handler
-                    inboundChannel.pipeline().addBefore("handler" , 
"inboundParser" , new ParserProxyHandler(inboundChannel , 
ParserProxyHandler.FRONTEND_CONN));
-                    
outboundChannel.pipeline().addBefore("proxyOutboundHandler" , "outboundParser" 
, new ParserProxyHandler(outboundChannel , ParserProxyHandler.BACKEND_CONN));
+                    if (connected.hasMaxMessageSize()) {
+                        inboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
+                                                          new 
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+                                                                               
            + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                                                               
            0, 4, 0, 4));
+                        outboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
+                                                           new 
LengthFieldBasedFrameDecoder(
+                                                               
connected.getMaxMessageSize()
+                                                               + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+
+                        inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
+                                                            new 
ParserProxyHandler(inboundChannel,
+                                                                               
    ParserProxyHandler.FRONTEND_CONN,
+                                                                               
    connected.getMaxMessageSize()));
+                        
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                                                             new 
ParserProxyHandler(outboundChannel,
+                                                                               
     ParserProxyHandler.BACKEND_CONN,
+                                                                               
     connected.getMaxMessageSize()));
+                    } else {
+                        inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
+                                                            new 
ParserProxyHandler(inboundChannel,
+                                                                               
    ParserProxyHandler.FRONTEND_CONN,
+                                                                               
    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                        
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                                                             new 
ParserProxyHandler(outboundChannel,
+                                                                               
     ParserProxyHandler.BACKEND_CONN,
+                                                                               
     Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                    }
                 }
                 // Start reading from both connections
                 inboundChannel.read();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 040a834..8b4fe64 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -51,15 +51,18 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
 
     private String connType;
 
+    private int maxMessageSize;
+
 
     //producerid+channelid as key
     //or consumerid+channelid as key
     private static Map<String, String> producerHashMap = new 
ConcurrentHashMap<>();
     private static Map<String, String> consumerHashMap = new 
ConcurrentHashMap<>();
 
-    public ParserProxyHandler(Channel channel, String type){
+    public ParserProxyHandler(Channel channel, String type, int 
maxMessageSize){
         this.channel = channel;
         this.connType=type;
+        this.maxMessageSize = maxMessageSize;
     }
 
     private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, 
String info, List<RawMessage> messages) throws Exception{
@@ -118,7 +121,7 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
                     MessageParser.parseMessage(topicName,  -1L,
                             -1L,buffer,(message) -> {
                                 messages.add(message);
-                            });
+                            }, maxMessageSize);
 
                     logging(ctx.channel() , cmd.getType() , "" , messages);
                     break;
@@ -138,7 +141,7 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
                     MessageParser.parseMessage(topicName,  -1L,
                                 -1L,buffer,(message) -> {
                                     messages.add(message);
-                                });
+                                }, maxMessageSize);
 
 
                     logging(ctx.channel() , cmd.getType() , "" , messages);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index b03afbf..9be109c 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -22,10 +22,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.util.ClientSslContextRefresher;
 import org.apache.pulsar.common.util.NettySslContextBuilder;
-import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
@@ -85,7 +84,8 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
             }
         }
 
-        ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+            Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         ch.pipeline().addLast("handler",
                 new ProxyConnection(proxyService, clientSslCtxRefresher == 
null ? null : clientSslCtxRefresher.get()));
     }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 2399233..d36c90d 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,6 +23,7 @@ import io.airlift.configuration.Config;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.common.api.Commands;
 
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
@@ -37,6 +38,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int targetNumSplits = 2;
     private int maxSplitMessageQueueSize = 10000;
     private int maxSplitEntryQueueSize = 1000;
+    private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
     private String statsProvider = NullStatsProvider.class.getName();
     private Map<String, String> statsProviderConfigs = new HashMap<>();
 
@@ -59,6 +61,16 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    @Config("pulsar.max-message-size")
+    public PulsarConnectorConfig setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+        return this;
+    }
+
+    public int getMaxMessageSize() {
+        return this.maxMessageSize;
+    }
+
     @NotNull
     public String getZookeeperUri() {
         return this.zookeeperUri;
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 4ffbd2f..09f3fa6 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -138,6 +138,7 @@ public class PulsarRecordCursor implements RecordCursor {
                 pulsarSplit.getTableName());
         this.metricsTracker = pulsarConnectorMetricsTracker;
         this.readOffloaded = 
pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
 
         Schema schema = 
PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
 
@@ -260,7 +261,7 @@ public class PulsarRecordCursor implements RecordCursor {
                                             } catch (InterruptedException e) {
                                                 //no-op
                                             }
-                                        });
+                                        }, 
pulsarConnectorConfig.getMaxMessageSize());
                             } catch (IOException e) {
                                 log.error(e, "Failed to parse message from 
pulsar topic %s", topicName.toString());
                                 throw new RuntimeException(e);

Reply via email to