This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0a635c53cf755cabef18f44223a832efeb52a489
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Feb 8 23:21:22 2022 +0800

    fix consume failure when BatchReceivePolicy#maxNumBytes < message size 
(#14139)
    
    (cherry picked from commit 88fc8445213650f3ab8eb4e3a8cc6fbe24545d07)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 22 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/MessagesImpl.java    |  4 ++++
 2 files changed, 26 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 5522dd7..1473f28 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -403,6 +403,28 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
         receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, 
batchReceivePolicy, messagesToSend / muxNumMessages);
     }
 
+    @Test
+    public void verifyNumBytesSmallerThanMessageSize() throws Exception {
+        final int messagesToSend = 500;
+
+        final String topic = "persistent://my-property/my-ns/batch-receive-" + 
UUID.randomUUID();
+        BatchReceivePolicy batchReceivePolicy = 
BatchReceivePolicy.builder().maxNumBytes(10).build();
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("s2")
+                .batchReceivePolicy(batchReceivePolicy)
+                .subscribe();
+
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        CountDownLatch latch = new CountDownLatch(messagesToSend+1);
+        receiveAsync(consumer, messagesToSend, latch);
+        latch.await();
+    }
+
 
     private void 
receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> 
consumer,
                                                                             
BatchReceivePolicy batchReceivePolicy,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index 4ff23eb..bb0335b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -45,6 +45,10 @@ public class MessagesImpl<T> implements Messages<T> {
     }
 
     protected boolean canAdd(Message<T> message) {
+        if (currentNumberOfMessages == 0) {
+            // It's ok to add at least one message into a batch.
+            return true;
+        }
         if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > 
maxNumberOfMessages) {
             return false;
         }

Reply via email to