This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3958aa642de [improve][client]PIP-189: No batching if only one message
in batch (#16605)
3958aa642de is described below
commit 3958aa642de38e5d30e801f49903e8d60077fc90
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Wed Aug 24 21:51:56 2022 +0800
[improve][client]PIP-189: No batching if only one message in batch (#16605)
[improve][client]PIP-189: No batching if only one message in batch #16605
### Motivation
* See https://github.com/apache/pulsar/issues/16619
### Modifications
* See https://github.com/apache/pulsar/issues/16619
* Most of the Modifications are relevant to `BatchMessageContainerImpl`
* Of course there are some tests about batching need to be modified,
because batched producer can also pubulish non-batched messages when this PIP
applies. The tests include:
* `RGUsageMTAggrWaitForAllMsgsTest`
* `BatchMessageTest`
* `BrokerEntryMetadataE2ETest`
* `ClientDeduplicationTest`
* `TopicReaderTest`
* `PulsarClientToolTest`
---
.../RGUsageMTAggrWaitForAllMsgsTest.java | 4 +-
.../pulsar/broker/service/BatchMessageTest.java | 37 +++++++-
.../broker/service/BrokerEntryMetadataE2ETest.java | 67 ++++++++-----
.../pulsar/client/api/ClientDeduplicationTest.java | 7 +-
.../apache/pulsar/client/api/TopicReaderTest.java | 6 +-
.../client/cli/PulsarClientToolForceBatchNum.java | 104 +++++++++++++++++++++
.../pulsar/client/cli/PulsarClientToolTest.java | 9 +-
.../client/impl/BatchMessageContainerImpl.java | 38 +++++++-
.../apache/pulsar/client/impl/ProducerImpl.java | 3 +
9 files changed, 240 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 27f9e905262..1acc5ad0039 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -769,8 +769,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
Assert.assertNotEquals(ninthPercentileValue, 0);
}
- // Empirically, there appears to be a 42-byte overhead for metadata,
imposed by Pulsar runtime.
- private static final int PER_MESSAGE_METADATA_OHEAD = 42;
+ // Empirically, there appears to be a 31-byte overhead for metadata,
imposed by Pulsar runtime.
+ private static final int PER_MESSAGE_METADATA_OHEAD = 31;
private static final int PUBLISH_INTERVAL_SECS = 10;
private static final int NUM_PRODUCERS = 4;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 0d18e243884..e9c5032063f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
@@ -865,6 +866,37 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
+ @Test(dataProvider = "containerBuilder")
+ public void testBatchSendOneMessage(BatcherBuilder builder) throws
Exception {
+ final String topicName =
"persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
+ final String subscriptionName = "sub-1";
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxPublishDelay(1,
TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true)
+ .batcherBuilder(builder)
+ .create();
+ String msg = "my-message";
+ MessageId messageId =
producer.newMessage().value(msg.getBytes()).property("key1", "value1").send();
+
+ Assert.assertTrue(messageId instanceof MessageIdImpl);
+ Assert.assertFalse(messageId instanceof BatchMessageIdImpl);
+
+ Message<byte[]> received = consumer.receive();
+ assertEquals(received.getSequenceId(), 0);
+ consumer.acknowledge(received);
+
+ Assert.assertEquals(new String(received.getData()), msg);
+ Assert.assertFalse(received.getProperties().isEmpty());
+ Assert.assertEquals(received.getProperties().get("key1"), "value1");
+ Assert.assertFalse(received.getMessageId() instanceof
BatchMessageIdImpl);
+
+ producer.close();
+ consumer.close();
+ }
+
@Test(dataProvider = "containerBuilder")
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws
Exception {
@@ -1034,7 +1066,10 @@ public class BatchMessageTest extends BrokerTestBase {
if (enableBatch) {
// only ack messages which batch index < 2, which means we
will not to ack the
// whole batch for the batch that with more than 2 messages
- if (((BatchMessageIdImpl)
message.getMessageId()).getBatchIndex() < 2) {
+ if ((message.getMessageId() instanceof BatchMessageIdImpl)
+ && ((BatchMessageIdImpl)
message.getMessageId()).getBatchIndex() < 2) {
+ consumer.acknowledgeAsync(message).get();
+ } else if (!(message.getMessageId() instanceof
BatchMessageIdImpl)){
consumer.acknowledgeAsync(message).get();
}
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 49b4742b71d..33785ba8795 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.broker.service;
import static
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.util.Sets;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -211,57 +214,75 @@ public class BrokerEntryMetadataE2ETest extends
BrokerTestBase {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
+ final int msgNum = 2;
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
+ // make sure 2 messages in one batch, because if only one
message in batch,
+ // producer will not send batched messages
+ .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .batchingMaxMessages(msgNum)
+ .batchingMaxBytes(Integer.MAX_VALUE)
.enableBatching(true)
.create();
long sendTime = System.currentTimeMillis();
- // send message which is batch message and only contains one message,
so do not set the deliverAtTime
- MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
+ // send message which is batch message, so do not set the deliverAtTime
+ List<CompletableFuture<MessageId>> messageIdsFuture = new
ArrayList<>(msgNum);
+ for (int i = 0; i < msgNum; ++i) {
+ CompletableFuture<MessageId> messageId = producer.newMessage()
.eventTime(eventTime)
- .value(("hello").getBytes())
- .send();
+ .value(("hello" + i).getBytes())
+ .sendAsync();
+ messageIdsFuture.add(messageId);
+ }
+ FutureUtil.waitForAll(messageIdsFuture);
// 1. test for peekMessages
admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
- final List<Message<byte[]>> messages =
admin.topics().peekMessages(topic, subscription, 1);
- Assert.assertEquals(messages.size(), 1);
-
- MessageImpl message = (MessageImpl) messages.get(0);
- Assert.assertEquals(message.getData(), ("hello").getBytes());
- Assert.assertTrue(message.getPublishTime() >= sendTime);
- BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
- Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
- Assert.assertEquals(entryMetadata.getIndex(), 0);
- System.out.println(message.getProperties());
-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
- // make sure BATCH_SIZE_HEADER > 0
-
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
+ final List<Message<byte[]>> messages =
admin.topics().peekMessages(topic, subscription, msgNum);
+ Assert.assertEquals(messages.size(), msgNum);
+
+ MessageImpl message;
+ BrokerEntryMetadata entryMetadata;
+ for (int i = 0; i < msgNum; ++i) {
+ message = (MessageImpl) messages.get(i);
+ Assert.assertEquals(message.getData(), ("hello" + i).getBytes());
+ Assert.assertTrue(message.getPublishTime() >= sendTime);
+ entryMetadata = message.getBrokerEntryMetadata();
+ Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
+ Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
+ System.out.println(message.getProperties());
+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)),
msgNum);
+ // make sure BATCH_SIZE_HEADER > 0
+
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
+ }
+ // getMessagesById and examineMessage only return the first messages
in the batch
// 2. test for getMessagesById
+ MessageIdImpl messageId = (MessageIdImpl)
messageIdsFuture.get(0).get();
message = (MessageImpl) admin.topics().getMessageById(topic,
messageId.getLedgerId(), messageId.getEntryId());
- Assert.assertEquals(message.getData(), ("hello").getBytes());
+ // getMessagesById return the first message in the batch
+ Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
- Assert.assertEquals(entryMetadata.getIndex(), 0);
+ Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)),
msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
// 3. test for examineMessage
message = (MessageImpl) admin.topics().examineMessage(topic,
"earliest", 1);
- Assert.assertEquals(message.getData(), ("hello").getBytes());
+ Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
- Assert.assertEquals(entryMetadata.getIndex(), 0);
+ Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)),
msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 52017444a2b..c8acc7d46f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
@@ -361,8 +362,10 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
for (int i = 0; i < 5; i++) {
// Currently sending a duplicated message won't throw an
exception. Instead, an invalid result is returned.
final MessageId messageId =
producer.newMessage().value("msg").sequenceId(i).send();
- assertTrue(messageId instanceof BatchMessageIdImpl);
- final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl)
messageId;
+ // a duplicated message will send in a single batch, that will
perform as a non-batched sending
+ assertTrue(messageId instanceof MessageIdImpl);
+ assertFalse(messageId instanceof BatchMessageIdImpl);
+ final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
assertEquals(messageIdImpl.getLedgerId(), -1L);
assertEquals(messageIdImpl.getEntryId(), -1L);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 8b533b5b450..72252309e65 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1080,7 +1080,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
}
@Test(timeOut = 20000)
- public void testHasMessageAvailableWithBatch() throws Exception {
+ public void testHasMessageAvailable() throws Exception {
final String topicName =
"persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
final int numOfMessage = 10;
@@ -1092,11 +1092,11 @@ public class TopicReaderTest extends
ProducerConsumerBase {
//For batch-messages with single message, the type of client messageId
should be the same as that of broker
MessageIdImpl messageId = (MessageIdImpl)
producer.send("msg".getBytes());
- assertTrue(messageId instanceof MessageIdImpl);
+ assertFalse(messageId instanceof BatchMessageIdImpl);
ReaderImpl<byte[]> reader =
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl)
reader.getConsumer().getLastMessageId();
- assertTrue(messageId instanceof BatchMessageIdImpl);
+ assertFalse(lastMsgId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
new file mode 100644
index 00000000000..b9f7b3f5e6f
--- /dev/null
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+
+/**
+ * An implement of {@link PulsarClientTool} for test, which will publish
messages iff there is enough messages
+ * in the batch.
+ */
+public class PulsarClientToolForceBatchNum extends PulsarClientTool{
+ private final String topic;
+ private final int batchNum;
+
+ /**
+ *
+ * @param properties properties
+ * @param topic topic
+ * @param batchNum iff there is batchNum messages in the batch, the
producer will flush and send.
+ */
+ public PulsarClientToolForceBatchNum(Properties properties, String topic,
int batchNum) {
+ super(properties);
+ this.topic = topic;
+ this.batchNum = batchNum;
+ }
+
+ @Override
+ protected void initJCommander() {
+ super.initJCommander();
+ produceCommand = new CmdProduce() {
+ @Override
+ public void updateConfig(ClientBuilder newBuilder, Authentication
authentication, String serviceURL) {
+ try {
+ super.updateConfig(mockClientBuilder(newBuilder),
authentication, serviceURL);
+ } catch (Exception e) {
+ Assert.fail("update config fail " + e.getMessage());
+ }
+ }
+ };
+ jcommander.addCommand("produce", produceCommand);
+ }
+
+ private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws
Exception {
+ PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .batchingMaxMessages(batchNum)
+ .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .topic(topic);
+ Producer<byte[]> producer = producerBuilder.create();
+
+ PulsarClientImpl mockClient = spy(client);
+ ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
+ Producer<byte[]> mockProducer = spy(producer);
+ ClientBuilder mockClientBuilder = spy(newBuilder);
+
+ doAnswer((Answer<TypedMessageBuilder>) invocation -> {
+ TypedMessageBuilder typedMessageBuilder =
spy((TypedMessageBuilder) invocation.callRealMethod());
+ doAnswer((Answer<MessageId>) invocation1 -> {
+ TypedMessageBuilder mock = ((TypedMessageBuilder)
invocation1.getMock());
+ // using sendAsync() to replace send()
+ mock.sendAsync();
+ return null;
+ }).when(typedMessageBuilder).send();
+ return typedMessageBuilder;
+ }).when(mockProducer).newMessage();
+
+ doReturn(mockProducer).when(mockProducerBuilder).create();
+
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
+ doReturn(mockClient).when(mockClientBuilder).build();
+ return mockClientBuilder;
+ }
+}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 5cc5e50b96d..29b82ae401e 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -244,16 +244,19 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("disable-batching");
- final int numberOfMessages = 5;
+ // `numberOfMessages` should be an even number, because we set
`batchNum` as 2, make sure batch and non batch
+ // messages in the same batch
+ final int numberOfMessages = 6;
+ final int batchNum = 2;
@Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
- PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
+ PulsarClientTool pulsarClientTool1 = new
PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args1 = {"produce", "-m", "batched", "-n",
Integer.toString(numberOfMessages), topicName};
Assert.assertEquals(pulsarClientTool1.run(args1), 0);
- PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
+ PulsarClientTool pulsarClientTool2 = new
PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args2 = {"produce", "-m", "non-batched", "-n",
Integer.toString(numberOfMessages), "-db", topicName};
Assert.assertEquals(pulsarClientTool2.run(args2), 0);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 44fad489dac..a5e77a5bcdd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -137,8 +137,12 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
MessageImpl<?> msg = messages.get(i);
msg.getDataBuffer().markReaderIndex();
try {
- batchedMessageMetadataAndPayload =
Commands.serializeSingleMessageInBatchWithPayload(
+ if (n == 1) {
+
batchedMessageMetadataAndPayload.writeBytes(msg.getDataBuffer());
+ } else {
+ batchedMessageMetadataAndPayload =
Commands.serializeSingleMessageInBatchWithPayload(
msg.getMessageBuilder(), msg.getDataBuffer(),
batchedMessageMetadataAndPayload);
+ }
} catch (Throwable th) {
// serializing batch message can corrupt the index of message
and batch-message. Reset the index so,
// next iteration doesn't send corrupt message to broker.
@@ -211,6 +215,38 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
@Override
public OpSendMsg createOpSendMsg() throws IOException {
+ if (messages.size() == 1) {
+ messageMetadata.clear();
+ messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
+ ByteBuf encryptedPayload =
producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
+ ByteBufPair cmd = producer.sendMessage(producer.producerId,
messageMetadata.getSequenceId(),
+ 1, messageMetadata, encryptedPayload);
+ final OpSendMsg op;
+
+ // Shouldn't call create(MessageImpl<?> msg, ByteBufPair cmd, long
sequenceId, SendCallback callback),
+ // otherwise it will bring message out of order problem.
+ // Because when invoke `ProducerImpl.processOpSendMsg` on flush,
+ // if `op.msg != null && isBatchMessagingEnabled()` checks true,
it will call `batchMessageAndSend` to flush
+ // messageContainers before publishing this one-batch message.
+ op = OpSendMsg.create(messages, cmd,
messageMetadata.getSequenceId(), firstCallback);
+
+ // NumMessagesInBatch and BatchSizeByte will not be serialized to
the binary cmd. It's just useful for the
+ // ProducerStats
+ op.setNumMessagesInBatch(1);
+ op.setBatchSizeByte(encryptedPayload.readableBytes());
+
+ // handle mgs size check as non-batched in
`ProducerImpl.isMessageSizeExceeded`
+ if (op.getMessageHeaderAndPayloadSize() >
ClientCnx.getMaxMessageSize()) {
+ producer.semaphoreRelease(1);
+
producer.client.getMemoryLimitController().releaseMemory(messages.get(0).getUncompressedSize());
+ discard(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ return null;
+ }
+ lowestSequenceId = -1L;
+ return op;
+ }
+
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 792c8596d1b..5b5fe8f9dc5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1479,6 +1479,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
void setMessageId(long ledgerId, long entryId, int partitionIndex) {
if (msg != null) {
msg.setMessageId(new MessageIdImpl(ledgerId, entryId,
partitionIndex));
+ } else if (msgs.size() == 1) {
+ // If there is only one message in batch, the producer will
publish messages like non-batch
+ msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId,
partitionIndex));
} else {
for (int batchIndex = 0; batchIndex < msgs.size();
batchIndex++) {
msgs.get(batchIndex)