This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0a635c53cf755cabef18f44223a832efeb52a489 Author: JiangHaiting <[email protected]> AuthorDate: Tue Feb 8 23:21:22 2022 +0800 fix consume failure when BatchReceivePolicy#maxNumBytes < message size (#14139) (cherry picked from commit 88fc8445213650f3ab8eb4e3a8cc6fbe24545d07) --- .../client/api/ConsumerBatchReceiveTest.java | 22 ++++++++++++++++++++++ .../apache/pulsar/client/impl/MessagesImpl.java | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index 5522dd7..1473f28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -403,6 +403,28 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, batchReceivePolicy, messagesToSend / muxNumMessages); } + @Test + public void verifyNumBytesSmallerThanMessageSize() throws Exception { + final int messagesToSend = 500; + + final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID(); + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(10).build(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("s2") + .batchReceivePolicy(batchReceivePolicy) + .subscribe(); + + sendMessagesAsyncAndWait(producer, messagesToSend); + CountDownLatch latch = new CountDownLatch(messagesToSend+1); + receiveAsync(consumer, messagesToSend, latch); + latch.await(); + } + private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer, BatchReceivePolicy batchReceivePolicy, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index 4ff23eb..bb0335b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -45,6 +45,10 @@ public class MessagesImpl<T> implements Messages<T> { } protected boolean canAdd(Message<T> message) { + if (currentNumberOfMessages == 0) { + // It's ok to add at least one message into a batch. + return true; + } if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) { return false; }
