This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new da1cd78 [WIP] PIP-36: Support set message size in broker.conf (#4247)
da1cd78 is described below
commit da1cd78d82c3d24b38805b7c34d3852d9ec73b63
Author: Yong Zhang <[email protected]>
AuthorDate: Thu May 16 05:21:29 2019 +0800
[WIP] PIP-36: Support set message size in broker.conf (#4247)
* Support set message size
---
*Motivation*
Currently Pulsar only support 5MB size of messages.But there are many cases
will use more than 5MB message
to transfer.
https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
*Modifications*
- Add message size in protocol
- Automaticlly adjust client message size by server
* Use `maxMessageSize` to set `nettyFrameSize` in bookie client
---
*Motivation*
When broker specify a `maxMessageSize` bookie should accept this value as
`nettyFrameSize`
*Modifications*
- Use `cnx().getMaxMessageSize`
- Discovery service only redirect so use the constant value `5 * 1024 *
1024` as message size
- Put `MAX_METADATA_SIZE` as constant value in `InternalConfigurationData`
* Use `Commands` to store message setting
---
*Modifications*
- use `Commands` to store default `MAX_MESSAGE_SIZE` and
`MESSAGE_SIZE_FRAME_PADDING`
- replace `LengthFieldBasedFrameDecoder` when has set message size
- replace `PulsarDecoder.MaxMessageSize`
* Fix some error
* Fix license header
* Add test and make `ClientCnx.maxMessageSize` static
---
*Motivation*
- Even if the cnx can't use, `maxMessageSize` should be used at compare
message size. So it should as a static variable
* fix code style
* Fix license header
---
.../apache/pulsar/broker/ServiceConfiguration.java | 8 ++
.../org/apache/pulsar/PulsarBrokerStarter.java | 8 ++
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 3 +
.../broker/service/PulsarChannelInitializer.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../pulsar/broker/service/MaxMessageSizeTest.java | 154 +++++++++++++++++++++
.../client/api/SimpleProducerConsumerTest.java | 14 +-
.../client/api/v1/V1_ProducerConsumerTest.java | 5 +-
.../pulsar/client/impl/MessageParserTest.java | 5 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 16 ++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 5 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 16 +--
.../client/impl/PulsarChannelInitializer.java | 9 +-
.../org/apache/pulsar/common/api/Commands.java | 14 +-
.../apache/pulsar/common/api/PulsarDecoder.java | 5 -
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 ++++++++
.../pulsar/common/api/raw/MessageParser.java | 8 +-
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
.../pulsar/discovery/service/ServerConnection.java | 1 +
.../service/ServiceChannelInitializer.java | 6 +-
.../pulsar/proxy/server/DirectProxyHandler.java | 46 +++++-
.../pulsar/proxy/server/ParserProxyHandler.java | 9 +-
.../proxy/server/ServiceChannelInitializer.java | 6 +-
.../pulsar/sql/presto/PulsarConnectorConfig.java | 12 ++
.../pulsar/sql/presto/PulsarRecordCursor.java | 3 +-
25 files changed, 369 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5428d55..5209e31 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -33,6 +33,8 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -489,6 +491,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ " Using a value of 0, is disabling
maxConsumersPerSubscription-limit check.")
private int maxConsumersPerSubscription = 0;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max size of messages.",
+ maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
+ private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 3ef104f..8f291ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -45,10 +45,13 @@ import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.common.util.ReflectionUtils;
+import org.apache.bookkeeper.util.DirectMemoryUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.slf4j.Logger;
@@ -139,6 +142,11 @@ public class PulsarBrokerStarter {
brokerConfig = loadConfig(starterArguments.brokerConfigFile);
}
+ int maxFrameSize = brokerConfig.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING;
+ if (maxFrameSize >= DirectMemoryUtils.maxDirectMemory()) {
+ throw new IllegalArgumentException("Max message size need
smaller than jvm directMemory");
+ }
+
// init functions worker
if (starterArguments.runFunctionsWorker ||
brokerConfig.isFunctionsWorkerEnabled()) {
WorkerConfig workerConfig;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index f15b9a3..65207e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -28,6 +28,8 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
@@ -57,6 +59,7 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
bkConf.setUseV2WireProtocol(conf.isBookkeeperUseV2WireProtocol());
bkConf.setEnableDigestTypeAutodetection(true);
bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads());
+ bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING);
bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
if (conf.isBookkeeperClientHealthCheckEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index ff24caa..f3e8214 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.service;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.ByteBufPair;
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.util.NettySslContextBuilder;
import io.netty.channel.ChannelInitializer;
@@ -35,6 +36,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
private final PulsarService pulsar;
private final boolean enableTls;
private final NettySslContextBuilder sslCtxRefresher;
+ private final ServiceConfiguration brokerConf;
/**
*
@@ -54,6 +56,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
} else {
this.sslCtxRefresher = null;
}
+ this.brokerConf = pulsar.getConfiguration();
}
@Override
@@ -65,7 +68,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ brokerConf.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(pulsar));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6fe3864..b5b6fe9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -136,6 +136,7 @@ public class ServerCnx extends PulsarHandler {
private boolean authenticateOriginalAuthData;
private final boolean schemaValidationEnforced;
private String authMethod = "none";
+ private final int maxMessageSize;
enum State {
Start, Connected, Failed, Connecting
@@ -156,6 +157,7 @@ public class ServerCnx extends PulsarHandler {
this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
this.authenticateOriginalAuthData =
service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
this.schemaValidationEnforced =
pulsar.getConfiguration().isSchemaValidationEnforced();
+ this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
}
@Override
@@ -455,7 +457,7 @@ public class ServerCnx extends PulsarHandler {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion)
{
- ctx.writeAndFlush(Commands.newConnected(clientProtoVersion));
+ ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize));
state = State.Connected;
remoteEndpointProtocolVersion = clientProtoVersion;
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /*
ignore default version: pulsar client */) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
new file mode 100644
index 0000000..4fb3183
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MaxMessageSizeTest {
+ private static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
+ PulsarService pulsar;
+ ServiceConfiguration configuration;
+
+ PulsarAdmin admin;
+
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int BROKER_WEBSERVER_PORT = PortManager.nextFreePort();
+
+ @BeforeMethod
+ void setup() {
+ try {
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT,
PortManager::nextFreePort);
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024);
+ bkEnsemble.startStandalone(conf, false);
+
+ configuration = new ServiceConfiguration();
+ configuration.setZookeeperServers("127.0.0.1:" + ZOOKEEPER_PORT);
+ configuration.setAdvertisedAddress("localhost");
+ configuration.setWebServicePort(BROKER_WEBSERVER_PORT);
+ configuration.setClusterName("max_message_test");
+ configuration.setBrokerServicePort(BROKER_SERVICE_PORT);
+ configuration.setAuthorizationEnabled(false);
+ configuration.setAuthenticationEnabled(false);
+ configuration.setManagedLedgerMaxEntriesPerLedger(5);
+ configuration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ configuration.setMaxMessageSize(10 * 1024 * 1024);
+
+ pulsar = new PulsarService(configuration);
+ pulsar.start();
+
+ String url = "http://127.0.0.1:" + BROKER_WEBSERVER_PORT;
+ admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
+ admin.clusters().createCluster("max_message_test", new
ClusterData(url));
+ admin.tenants()
+ .createTenant("test", new
TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
+ admin.namespaces().createNamespace("test/message",
Sets.newHashSet("max_message_test"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterMethod
+ void shutdown() {
+ try {
+ pulsar.close();
+ bkEnsemble.stop();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testMaxMessageSetting() throws PulsarClientException {
+
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" +
BROKER_SERVICE_PORT).build();
+ String topicName = "persistent://test/message/topic1";
+ Producer producer =
client.newProducer().topic(topicName).sendTimeout(60,
TimeUnit.SECONDS).create();
+ Consumer consumer =
client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
+
+ // less than 5MB message
+
+ byte[] normalMsg = new byte[2 * 1024 * 1024];
+
+ try {
+ producer.send(normalMsg);
+ } catch (PulsarClientException e) {
+ Assert.fail("Shouldn't have exception at here", e);
+ }
+
+ byte[] consumerNormalMsg = consumer.receive().getData();
+ Assert.assertEquals(normalMsg, consumerNormalMsg);
+
+ // equal 5MB message
+ byte[] limitMsg = new byte[5 * 1024 * 1024];
+ try {
+ producer.send(limitMsg);
+ } catch (PulsarClientException e) {
+ Assert.fail("Shouldn't have exception at here", e);
+ }
+
+ byte[] consumerLimitMsg = consumer.receive().getData();
+ Assert.assertEquals(limitMsg, consumerLimitMsg);
+
+ // less than 10MB message
+ byte[] newNormalMsg = new byte[8 * 1024 * 1024];
+ try {
+ producer.send(newNormalMsg);
+ } catch (PulsarClientException e) {
+ Assert.fail("Shouldn't have exception at here", e);
+ }
+
+ byte[] consumerNewNormalMsg = consumer.receive().getData();
+ Assert.assertEquals(newNormalMsg, consumerNewNormalMsg);
+
+ // equals 10MB message
+ byte[] newLimitMsg = new byte[10 * 1024 * 1024];
+ try {
+ producer.send(newLimitMsg);
+ Assert.fail("Shouldn't send out this message");
+ } catch (PulsarClientException e) {
+ // no-op
+ }
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer.close();
+ client.close();
+
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7192e7f..32c6fa8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -634,10 +634,10 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// Messages are allowed up to MaxMessageSize
- producer.newMessage().value(new byte[PulsarDecoder.MaxMessageSize]);
+ producer.newMessage().value(new
byte[Commands.DEFAULT_MAX_MESSAGE_SIZE]);
try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
@@ -671,7 +671,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4)
.create();
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
producer.close();
// (b) batch-msg with compression
@@ -680,7 +680,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4)
.create();
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
producer.close();
// (c) non-batch msg without compression
@@ -690,7 +690,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.compressionType(CompressionType.NONE)
.create();
try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
@@ -704,7 +704,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4).create();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
- byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
+ byte[] content = new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 10];
producer.send(content);
assertEquals(consumer.receive().getData(), content);
producer.close();
@@ -716,7 +716,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.compressionType(CompressionType.NONE)
.create();
try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index fecef79..2040979 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -607,10 +608,10 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
// Messages are allowed up to MaxMessageSize
- producer.newMessage().value(new byte[PulsarDecoder.MaxMessageSize]);
+ producer.newMessage().value(new
byte[Commands.DEFAULT_MAX_MESSAGE_SIZE]);
try {
- producer.send(new byte[PulsarDecoder.MaxMessageSize + 1]);
+ producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 5ec6332..1e0c45c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -34,6 +34,7 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
@@ -91,7 +92,7 @@ public class MessageParserTest extends
MockedPulsarServiceBaseTest {
MessageParser.parseMessage(topicName, entry.getLedgerId(),
entry.getEntryId(), entry.getDataBuffer(),
(message) -> {
messages.add(message);
- });
+ }, Commands.DEFAULT_MAX_MESSAGE_SIZE);
} finally {
entry.release();
}
@@ -133,7 +134,7 @@ public class MessageParserTest extends
MockedPulsarServiceBaseTest {
MessageParser.parseMessage(topicName, entry.getLedgerId(),
entry.getEntryId(), entry.getDataBuffer(),
(message) -> {
messages.add(message);
- });
+ }, Commands.DEFAULT_MAX_MESSAGE_SIZE);
} finally {
entry.release();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 1a88afd..7a58357 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -30,6 +30,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLSession;
+import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.PulsarVersion;
@@ -117,6 +119,10 @@ public class ClientCnx extends PulsarHandler {
.newUpdater(ClientCnx.class, "numberOfRejectRequests");
@SuppressWarnings("unused")
private volatile int numberOfRejectRequests = 0;
+
+ @Getter
+ private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
private final int protocolVersion;
@@ -275,7 +281,15 @@ public class ClientCnx extends PulsarHandler {
}
checkArgument(state == State.SentConnectFrame || state ==
State.Connecting);
-
+ if (connected.hasMaxMessageSize()) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Connection has max message size setting, replace
old frameDecoder with "
+ + "server frame size {}", ctx.channel(),
connected.getMaxMessageSize());
+ }
+ maxMessageSize = connected.getMaxMessageSize();
+ ctx.pipeline().replace("frameDecoder", "newFrameDecoder", new
LengthFieldBasedFrameDecoder(
+ connected.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ }
if (log.isDebugEnabled()) {
log.debug("{} Connection is ready", ctx.channel());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8909989..b2421f2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -66,7 +66,6 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
-import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
@@ -1141,10 +1140,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
- if (payloadSize > PulsarDecoder.MaxMessageSize) {
+ if (payloadSize > ClientCnx.getMaxMessageSize()) {
// payload size is itself corrupted since it cannot be bigger than
the MaxMessageSize
log.error("[{}][{}] Got corrupted payload message size {} at {}",
topic, subscription, payloadSize,
- messageId);
+ messageId);
discardCorruptedMessage(messageId, currentCnx,
ValidationError.UncompressedSizeCorruption);
return null;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b4a0993..3e32fb9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -61,7 +61,6 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
-import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -312,15 +311,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// validate msg-size (For batching this will be check at the batch
completion size)
int compressedSize = compressedPayload.readableBytes();
-
- if (compressedSize > PulsarDecoder.MaxMessageSize) {
+ if (compressedSize > ClientCnx.getMaxMessageSize()) {
compressedPayload.release();
String compressedStr = (!isBatchMessagingEnabled() &&
conf.getCompressionType() != CompressionType.NONE)
- ? "Compressed"
- : "";
+ ? "Compressed"
+ : "";
PulsarClientException.InvalidMessageException
invalidMessageException = new PulsarClientException.InvalidMessageException(
- format("%s Message payload size %d cannot exceed %d
bytes", compressedStr, compressedSize,
- PulsarDecoder.MaxMessageSize));
+ format("%s Message payload size %d cannot exceed %d
bytes", compressedStr, compressedSize,
+ ClientCnx.getMaxMessageSize()));
callback.sendComplete(invalidMessageException);
return;
}
@@ -1306,12 +1304,12 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
op = OpSendMsg.create(batchMessageContainer.messages, cmd,
sequenceId,
batchMessageContainer.firstCallback);
- if (encryptedPayload.readableBytes() >
PulsarDecoder.MaxMessageSize) {
+ if (encryptedPayload.readableBytes() >
ClientCnx.getMaxMessageSize()) {
cmd.release();
semaphore.release(numMessagesInBatch);
if (op != null) {
op.callback.sendComplete(new
PulsarClientException.InvalidMessageException(
- "Message size is bigger than " +
PulsarDecoder.MaxMessageSize + " bytes"));
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
}
return;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 494cb12..5d0f872 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -29,6 +29,7 @@ import io.netty.handler.ssl.SslContext;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -38,6 +39,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
private final Supplier<ClientCnx> clientCnxSupplier;
private final SslContext sslCtx;
+ private final ClientConfigurationData conf;
public PulsarChannelInitializer(ClientConfigurationData conf,
Supplier<ClientCnx> clientCnxSupplier)
throws Exception {
@@ -57,6 +59,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
} else {
this.sslCtx = null;
}
+ this.conf = conf;
}
@Override
@@ -68,7 +71,11 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+ ch.pipeline()
+ .addLast("frameDecoder",
+ new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 3692e84..04018f6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -94,6 +94,11 @@ import
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
public class Commands {
+ // default message size for transfer
+ public static final int DEFAULT_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
+ public static final int MESSAGE_SIZE_FRAME_PADDING = 10 * 1024;
+ public static final int INVALID_MAX_MESSAGE_SIZE = -1;
+
public static final short magicCrc32c = 0x0e01;
private static final int checksumSize = 4;
@@ -189,9 +194,16 @@ public class Commands {
return res;
}
- public static ByteBuf newConnected(int clientProtocolVersion) {
+ public static ByteBuf newConnected(int clientProtocoVersion) {
+ return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE);
+ }
+
+ public static ByteBuf newConnected(int clientProtocolVersion, int
maxMessageSize) {
CommandConnected.Builder connectedBuilder =
CommandConnected.newBuilder();
connectedBuilder.setServerVersion("Pulsar Server");
+ if (INVALID_MAX_MESSAGE_SIZE != maxMessageSize) {
+ connectedBuilder.setMaxMessageSize(maxMessageSize);
+ }
// If the broker supports a newer version of the protocol, it will
anyway advertise the max version that the
// client supports, to avoid confusing the client.
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 0e1ea73..38ff90f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -66,11 +66,6 @@ import org.slf4j.LoggerFactory;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
- // Max message size is limited by max BookKeeper entry size which is 5MB,
and we need to account
- // for headers as well.
- public final static int MaxMessageSize = (5 * 1024 * 1024 - (10 * 1024));
- public final static int MaxFrameSize = 5 * 1024 * 1024;
-
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
// Get a buffer that contains the full frame
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 053ad19..319302d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6533,6 +6533,10 @@ public final class PulsarApi {
// optional int32 protocol_version = 2 [default = 0];
boolean hasProtocolVersion();
int getProtocolVersion();
+
+ // optional int32 max_message_size = 3 [default = 5242880];
+ boolean hasMaxMessageSize();
+ int getMaxMessageSize();
}
public static final class CommandConnected extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -6611,9 +6615,20 @@ public final class PulsarApi {
return protocolVersion_;
}
+ // optional int32 max_message_size = 3 [default = 5242880];
+ public static final int MAX_MESSAGE_SIZE_FIELD_NUMBER = 3;
+ private int maxMessageSize_;
+ public boolean hasMaxMessageSize() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public int getMaxMessageSize() {
+ return maxMessageSize_;
+ }
+
private void initFields() {
serverVersion_ = "";
protocolVersion_ = 0;
+ maxMessageSize_ = 5242880;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6642,6 +6657,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeInt32(2, protocolVersion_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt32(3, maxMessageSize_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -6658,6 +6676,10 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeInt32Size(2, protocolVersion_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(3, maxMessageSize_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -6775,6 +6797,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000001);
protocolVersion_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
+ maxMessageSize_ = 5242880;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -6816,6 +6840,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000002;
}
result.protocolVersion_ = protocolVersion_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.maxMessageSize_ = maxMessageSize_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -6828,6 +6856,9 @@ public final class PulsarApi {
if (other.hasProtocolVersion()) {
setProtocolVersion(other.getProtocolVersion());
}
+ if (other.hasMaxMessageSize()) {
+ setMaxMessageSize(other.getMaxMessageSize());
+ }
return this;
}
@@ -6871,6 +6902,11 @@ public final class PulsarApi {
protocolVersion_ = input.readInt32();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ maxMessageSize_ = input.readInt32();
+ break;
+ }
}
}
}
@@ -6934,6 +6970,27 @@ public final class PulsarApi {
return this;
}
+ // optional int32 max_message_size = 3 [default = 5242880];
+ private int maxMessageSize_ = 5242880;
+ public boolean hasMaxMessageSize() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public int getMaxMessageSize() {
+ return maxMessageSize_;
+ }
+ public Builder setMaxMessageSize(int value) {
+ bitField0_ |= 0x00000004;
+ maxMessageSize_ = value;
+
+ return this;
+ }
+ public Builder clearMaxMessageSize() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ maxMessageSize_ = 5242880;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandConnected)
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 1f1d66e..da24a6c 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -50,7 +50,7 @@ public class MessageParser {
* provided {@link MessageProcessor} will be invoked for each individual
message.
*/
public static void parseMessage(TopicName topicName, long ledgerId, long
entryId, ByteBuf headersAndPayload,
- MessageProcessor processor) throws IOException {
+ MessageProcessor processor, int maxMessageSize) throws IOException
{
MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;
ByteBuf uncompressedPayload = null;
@@ -74,7 +74,7 @@ public class MessageParser {
}
uncompressedPayload = uncompressPayloadIfNeeded(topicName,
msgMetadata, headersAndPayload, ledgerId,
- entryId);
+ entryId, maxMessageSize);
if (uncompressedPayload == null) {
// Message was discarded on decompression error
@@ -115,11 +115,11 @@ public class MessageParser {
}
public static ByteBuf uncompressPayloadIfNeeded(TopicName topic,
MessageMetadata msgMetadata,
- ByteBuf payload, long ledgerId, long entryId) {
+ ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
- if (payloadSize > PulsarDecoder.MaxMessageSize) {
+ if (payloadSize > maxMessageSize) {
// payload size is itself corrupted since it cannot be bigger than
the MaxMessageSize
log.error("[{}] Got corrupted payload message size {} at {}:{}",
topic, payloadSize,
ledgerId, entryId);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index d73e601..11d69be 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -213,6 +213,7 @@ message CommandConnect {
message CommandConnected {
required string server_version = 1;
optional int32 protocol_version = 2 [default = 0];
+ optional int32 max_message_size = 3;
}
message CommandAuthResponse {
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 0f33462..b89a028 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index 212a740..50d698d 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -18,9 +18,8 @@
*/
package org.apache.pulsar.discovery.service;
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.util.NettySslContextBuilder;
-import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import io.netty.channel.ChannelInitializer;
@@ -63,7 +62,8 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast(TLS_HANDLER,
sslContext.newHandler(ch.alloc()));
}
}
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new
ServerConnection(discoveryService));
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ffa4c2c..a8deb44 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.URI;
import java.net.URISyntaxException;
@@ -39,6 +38,7 @@ import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,8 +98,8 @@ public class DirectProxyHandler {
if (sslCtx != null) {
ch.pipeline().addLast(TLS_HANDLER,
sslCtx.newHandler(ch.alloc()));
}
- ch.pipeline().addLast("frameDecoder",
- new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("proxyOutboundHandler", new
ProxyBackendHandler(config, protocolVersion));
}
});
@@ -259,7 +259,15 @@ public class DirectProxyHandler {
state = BackendState.HandshakeCompleted;
-
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future
-> {
+ ChannelFuture channelFuture;
+ if (connected.hasMaxMessageSize()) {
+ channelFuture = inboundChannel.writeAndFlush(
+ Commands.newConnected(connected.getProtocolVersion(),
connected.getMaxMessageSize()));
+ } else {
+ channelFuture =
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
+ }
+
+ channelFuture.addListener(future -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removing decoder from pipeline",
inboundChannel, outboundChannel);
}
@@ -270,8 +278,34 @@ public class DirectProxyHandler {
} else {
// Enable parsing feature, proxyLogLevel(1 or 2)
// Add parser handler
- inboundChannel.pipeline().addBefore("handler" ,
"inboundParser" , new ParserProxyHandler(inboundChannel ,
ParserProxyHandler.FRONTEND_CONN));
-
outboundChannel.pipeline().addBefore("proxyOutboundHandler" , "outboundParser"
, new ParserProxyHandler(outboundChannel , ParserProxyHandler.BACKEND_CONN));
+ if (connected.hasMaxMessageSize()) {
+ inboundChannel.pipeline().replace("frameDecoder",
"newFrameDecoder",
+ new
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+
+ Commands.MESSAGE_SIZE_FRAME_PADDING,
+
0, 4, 0, 4));
+ outboundChannel.pipeline().replace("frameDecoder",
"newFrameDecoder",
+ new
LengthFieldBasedFrameDecoder(
+
connected.getMaxMessageSize()
+ +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+
+ inboundChannel.pipeline().addBefore("handler",
"inboundParser",
+ new
ParserProxyHandler(inboundChannel,
+
ParserProxyHandler.FRONTEND_CONN,
+
connected.getMaxMessageSize()));
+
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+ new
ParserProxyHandler(outboundChannel,
+
ParserProxyHandler.BACKEND_CONN,
+
connected.getMaxMessageSize()));
+ } else {
+ inboundChannel.pipeline().addBefore("handler",
"inboundParser",
+ new
ParserProxyHandler(inboundChannel,
+
ParserProxyHandler.FRONTEND_CONN,
+
Commands.DEFAULT_MAX_MESSAGE_SIZE));
+
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+ new
ParserProxyHandler(outboundChannel,
+
ParserProxyHandler.BACKEND_CONN,
+
Commands.DEFAULT_MAX_MESSAGE_SIZE));
+ }
}
// Start reading from both connections
inboundChannel.read();
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 040a834..8b4fe64 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -51,15 +51,18 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
private String connType;
+ private int maxMessageSize;
+
//producerid+channelid as key
//or consumerid+channelid as key
private static Map<String, String> producerHashMap = new
ConcurrentHashMap<>();
private static Map<String, String> consumerHashMap = new
ConcurrentHashMap<>();
- public ParserProxyHandler(Channel channel, String type){
+ public ParserProxyHandler(Channel channel, String type, int
maxMessageSize){
this.channel = channel;
this.connType=type;
+ this.maxMessageSize = maxMessageSize;
}
private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype,
String info, List<RawMessage> messages) throws Exception{
@@ -118,7 +121,7 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
MessageParser.parseMessage(topicName, -1L,
-1L,buffer,(message) -> {
messages.add(message);
- });
+ }, maxMessageSize);
logging(ctx.channel() , cmd.getType() , "" , messages);
break;
@@ -138,7 +141,7 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
MessageParser.parseMessage(topicName, -1L,
-1L,buffer,(message) -> {
messages.add(message);
- });
+ }, maxMessageSize);
logging(ctx.channel() , cmd.getType() , "" , messages);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index b03afbf..9be109c 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -22,10 +22,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.util.ClientSslContextRefresher;
import org.apache.pulsar.common.util.NettySslContextBuilder;
-import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
@@ -85,7 +84,8 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
}
}
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new ProxyConnection(proxyService, clientSslCtxRefresher ==
null ? null : clientSslCtxRefresher.get()));
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 2399233..d36c90d 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,6 +23,7 @@ import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.common.api.Commands;
import javax.validation.constraints.NotNull;
import java.io.IOException;
@@ -37,6 +38,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
private int maxSplitEntryQueueSize = 1000;
+ private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private String statsProvider = NullStatsProvider.class.getName();
private Map<String, String> statsProviderConfigs = new HashMap<>();
@@ -59,6 +61,16 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
+ @Config("pulsar.max-message-size")
+ public PulsarConnectorConfig setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ return this;
+ }
+
+ public int getMaxMessageSize() {
+ return this.maxMessageSize;
+ }
+
@NotNull
public String getZookeeperUri() {
return this.zookeeperUri;
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 4ffbd2f..09f3fa6 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -138,6 +138,7 @@ public class PulsarRecordCursor implements RecordCursor {
pulsarSplit.getTableName());
this.metricsTracker = pulsarConnectorMetricsTracker;
this.readOffloaded =
pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
+ this.pulsarConnectorConfig = pulsarConnectorConfig;
Schema schema =
PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
@@ -260,7 +261,7 @@ public class PulsarRecordCursor implements RecordCursor {
} catch (InterruptedException e) {
//no-op
}
- });
+ },
pulsarConnectorConfig.getMaxMessageSize());
} catch (IOException e) {
log.error(e, "Failed to parse message from
pulsar topic %s", topicName.toString());
throw new RuntimeException(e);