This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71598c11637 [fix][client]Fixed getting an incorrect `maxMessageSize`
value when accessing multiple clusters in the same process (#22306)
71598c11637 is described below
commit 71598c1163730defb9fdea85e813fe863c3fe4d2
Author: atomchen <[email protected]>
AuthorDate: Thu Mar 21 17:30:40 2024 +0800
[fix][client]Fixed getting an incorrect `maxMessageSize` value when
accessing multiple clusters in the same process (#22306)
Co-authored-by: atomchchen <[email protected]>
---
.../client/api/SimpleProducerConsumerTest.java | 6 ++--
.../client/impl/ProducerMemoryLimitTest.java | 12 ++++----
.../pulsar/client/impl/ProducerSemaphoreTest.java | 18 ++++++------
.../client/impl/AbstractBatchMessageContainer.java | 9 ++++--
.../client/impl/BatchMessageContainerImpl.java | 10 +++----
.../org/apache/pulsar/client/impl/ClientCnx.java | 3 +-
.../pulsar/client/impl/ConnectionHandler.java | 7 +++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 32 ++++++++++++++--------
9 files changed, 60 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4536bda907b..4c106d39e7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -93,13 +93,13 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.topic("persistent://my-property/my-ns/my-topic2");
@Cleanup
- Producer<byte[]> producer = producerBuilder.create();
+ ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>)producerBuilder.create();
List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
- byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
+ byte[] message = new
byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
for (int i = 0; i < maxPendingMessages + 10; i++) {
Future<MessageId> future = producer.sendAsync(message);
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index d776fdb0ed9..55a67ae644d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
.create();
this.stopBroker();
try {
- try (MockedStatic<ClientCnx> mockedStatic =
Mockito.mockStatic(ClientCnx.class)) {
- mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
- producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
- }
+ ConnectionHandler connectionHandler =
Mockito.spy(producer.getConnectionHandler());
+ Field field =
producer.getClass().getDeclaredField("connectionHandler");
+ field.setAccessible(true);
+ field.set(producer, connectionHandler);
+ when(connectionHandler.getMaxMessageSize()).thenReturn(8);
+ producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 2f8cb655401..42f431e0b9b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.FutureUtil;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -72,13 +72,14 @@ public class ProducerSemaphoreTest extends
ProducerConsumerBase {
.maxPendingMessages(pendingQueueSize)
.enableBatching(true)
.create();
-
this.stopBroker();
try {
- try (MockedStatic<ClientCnx> mockedStatic =
Mockito.mockStatic(ClientCnx.class)) {
- mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
-
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
- }
+ ConnectionHandler connectionHandler =
Mockito.spy(producer.getConnectionHandler());
+ Field field =
producer.getClass().getDeclaredField("connectionHandler");
+ field.setAccessible(true);
+ field.set(producer, connectionHandler);
+ when(connectionHandler.getMaxMessageSize()).thenReturn(2);
+ producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
@@ -86,10 +87,7 @@ public class ProducerSemaphoreTest extends
ProducerConsumerBase {
producer.conf.setBatchingEnabled(false);
try {
- try (MockedStatic<ClientCnx> mockedStatic =
Mockito.mockStatic(ClientCnx.class)) {
- mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
-
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
- }
+ producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 8c17d8fcb25..3ba7866350a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
/**
* Batch message container framework.
@@ -59,14 +60,18 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
public boolean haveEnoughSpace(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return (
- (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <=
ClientCnx.getMaxMessageSize())
+ (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <=
getMaxMessageSize())
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes)
<= maxBytesInBatch)
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch <
maxNumMessagesInBatch);
}
+ protected int getMaxMessageSize() {
+ return producer != null && producer.getConnectionHandler() != null
+ ? producer.getConnectionHandler().getMaxMessageSize() :
Commands.DEFAULT_MAX_MESSAGE_SIZE;
+ }
protected boolean isBatchFull() {
return (maxBytesInBatch > 0 && currentBatchSizeBytes >=
maxBytesInBatch)
- || (maxBytesInBatch <= 0 && currentBatchSizeBytes >=
ClientCnx.getMaxMessageSize())
+ || (maxBytesInBatch <= 0 && currentBatchSizeBytes >=
getMaxMessageSize())
|| (maxNumMessagesInBatch > 0 && numMessagesInBatch >=
maxNumMessagesInBatch);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index bf8c1f9de82..fc5c3a3c679 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -101,7 +101,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
lowestSequenceId =
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = allocator.buffer(
- Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+ Math.min(maxBatchSize, getMaxMessageSize()));
updateAndReserveBatchAllocatedSize(batchedMessageMetadataAndPayload.capacity());
if (msg.getMessageBuilder().hasTxnidMostBits() &&
currentTxnidMostBits == -1) {
currentTxnidMostBits =
msg.getMessageBuilder().getTxnidMostBits();
@@ -272,12 +272,12 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
op.setBatchSizeByte(encryptedPayload.readableBytes());
// handle mgs size check as non-batched in
`ProducerImpl.isMessageSizeExceeded`
- if (op.getMessageHeaderAndPayloadSize() >
ClientCnx.getMaxMessageSize()) {
+ if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
producer.semaphoreRelease(1);
producer.client.getMemoryLimitController().releaseMemory(
messages.get(0).getUncompressedSize() +
batchAllocatedSizeBytes);
discard(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ "Message size is bigger than " + getMaxMessageSize() + "
bytes"));
return null;
}
lowestSequenceId = -1L;
@@ -285,13 +285,13 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
}
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
- if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
discard(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ "Message size is bigger than " + getMaxMessageSize() + "
bytes"));
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b3444ae393e..938a0b4d8f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -168,8 +168,7 @@ public class ClientCnx extends PulsarHandler {
private volatile int numberOfRejectRequests = 0;
@Getter
- private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
-
+ private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
protected final int protocolVersion;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 7700596dca3..f0f78420115 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -26,8 +26,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
+import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,10 @@ public class ConnectionHandler {
AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class,
ClientCnx.class, "clientCnx");
@SuppressWarnings("unused")
private volatile ClientCnx clientCnx = null;
+ @Getter
+ @Setter
+ // Since the `clientCnx` variable will be set to null at some times, it is
necessary to save this value here.
+ private volatile int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
protected final HandlerState state;
protected final Backoff backoff;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c09e0afe58d..6c2ded819a5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -776,6 +776,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
+ getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
final State state = getState();
if (state == State.Closing || state == State.Closed) {
@@ -1896,7 +1897,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
- if (checkMaxMessageSize && payloadSize >
ClientCnx.getMaxMessageSize()) {
+ if (checkMaxMessageSize && payloadSize >
getConnectionHandler().getMaxMessageSize()) {
// payload size is itself corrupted since it cannot be bigger than
the MaxMessageSize
log.error("[{}][{}] Got corrupted payload message size {} at {}",
topic, subscription, payloadSize,
messageId);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index da73514deb3..880185f7a97 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -180,9 +180,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
this.partitionIndex = partitionIndex;
this.pendingMessages = createPendingMessagesQueue();
- this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
- ? Math.min(conf.getChunkMaxMessageSize(),
ClientCnx.getMaxMessageSize())
- : ClientCnx.getMaxMessageSize();
if (conf.getMaxPendingMessages() > 0) {
this.semaphore = Optional.of(new
Semaphore(conf.getMaxPendingMessages(), true));
} else {
@@ -275,10 +272,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() -
100), TimeUnit.MILLISECONDS)
.create(),
this);
-
+ setChunkMaxMessageSize();
grabCnx();
}
+ private void setChunkMaxMessageSize() {
+ this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
+ ? Math.min(conf.getChunkMaxMessageSize(), getMaxMessageSize())
+ : getMaxMessageSize();
+ }
+
protected void semaphoreRelease(final int releaseCountRequest) {
if (semaphore.isPresent()) {
if (!errorState) {
@@ -455,14 +458,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// validate msg-size (For batching this will be check at the batch
completion size)
int compressedSize = compressedPayload.readableBytes();
- if (compressedSize > ClientCnx.getMaxMessageSize() &&
!this.conf.isChunkingEnabled()) {
+ if (compressedSize > getMaxMessageSize() &&
!this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() !=
CompressionType.NONE ? "Compressed" : "";
PulsarClientException.InvalidMessageException
invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends
a %s message with %d bytes that exceeds"
+ " %d bytes",
- producerName, topic, compressedStr, compressedSize,
ClientCnx.getMaxMessageSize()));
+ producerName, topic, compressedStr, compressedSize,
getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize,
callback, invalidMessageException);
return;
}
@@ -492,19 +495,19 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
int payloadChunkSize;
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
totalChunks = 1;
- payloadChunkSize = ClientCnx.getMaxMessageSize();
+ payloadChunkSize = getMaxMessageSize();
} else {
// Reserve current metadata size for chunk size to avoid message
size overflow.
// NOTE: this is not strictly bounded, as metadata will be updated
after chunking.
// So there is a small chance that the final message size is
larger than ClientCnx.getMaxMessageSize().
// But it won't cause produce failure as broker have 10 KB padding
space for these cases.
- payloadChunkSize = ClientCnx.getMaxMessageSize() -
msgMetadata.getSerializedSize();
+ payloadChunkSize = getMaxMessageSize() -
msgMetadata.getSerializedSize();
if (payloadChunkSize <= 0) {
PulsarClientException.InvalidMessageException
invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends
a message with %d bytes metadata that "
+ "exceeds %d bytes",
producerName, topic,
- msgMetadata.getSerializedSize(),
ClientCnx.getMaxMessageSize()));
+ msgMetadata.getSerializedSize(),
getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize,
callback, invalidMessageException);
compressedPayload.release();
return;
@@ -1663,7 +1666,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
- chunkMaxMessageSize = Math.min(chunkMaxMessageSize,
ClientCnx.getMaxMessageSize());
+ getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
+ setChunkMaxMessageSize();
final long epoch;
synchronized (this) {
@@ -2323,11 +2327,11 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
private boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && !conf.isChunkingEnabled()) {
int messageSize = op.getMessageHeaderAndPayloadSize();
- if (messageSize > ClientCnx.getMaxMessageSize()) {
+ if (messageSize > getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);
op.sendComplete(new
PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a
message with %d bytes that exceeds %d bytes",
- producerName, topic, messageSize,
ClientCnx.getMaxMessageSize()),
+ producerName, topic, messageSize,
getMaxMessageSize()),
op.sequenceId));
return true;
}
@@ -2335,6 +2339,10 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return false;
}
+ private int getMaxMessageSize() {
+ return getConnectionHandler().getMaxMessageSize();
+ }
+
public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {