This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 29ee145f53873a3a9b7f087c3c2c97749f3ca695
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu May 2 16:57:49 2024 -0700

    [fix] [client] Fix Consumer should return configured batch receive max 
messages (#22619)
    
    (cherry picked from commit 0219921b5b7cd157092ac8f2d86ab7e60787d36c)
---
 .../client/api/ConsumerBatchReceiveTest.java       |  8 +++---
 .../client/api/SimpleProducerConsumerTest.java     | 29 ++++++++++++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  4 +++
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index d54b1c99e3e..974d25aad64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
                 // Number of message limitation exceed receiverQueue size
                 {
                     BatchReceivePolicy.builder()
-                        .maxNumMessages(70)
+                        .maxNumMessages(50)
                         .build(), true, 50, false
                 },
                 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
                 // Number of message limitation exceed receiverQueue size
                 {
                     BatchReceivePolicy.builder()
-                        .maxNumMessages(70)
+                        .maxNumMessages(50)
                         .build(), false, 50, false
                 },
                 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
                 // Number of message limitation exceed receiverQueue size
                 {
                         BatchReceivePolicy.builder()
-                                .maxNumMessages(70)
+                                .maxNumMessages(50)
                                 .build(), true, 50, true
                 },
                 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
                 // Number of message limitation exceed receiverQueue size
                 {
                         BatchReceivePolicy.builder()
-                                .maxNumMessages(70)
+                                .maxNumMessages(50)
                                 .build(), false, 50, true
                 },
                 // Number of message limitation exceed receiverQueue size and 
timeout limitation
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 68158dd69a4..7877d5ab604 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         admin.topics().delete(topic, false);
     }
 
+    /**
+     * It verifies that consumer receives configured number of messages into 
the batch.
+     * @throws Exception
+     */
+    @Test
+    public void testBatchReceiveWithMaxBatchSize() throws Exception {
+        int maxBatchSize = 100;
+        final int internalQueueSize = 10;
+        final int maxBytes = 2000000;
+        final int timeOutInSeconds = 900;
+        final String topic = "persistent://my-property/my-ns/testBatchReceive";
+        BatchReceivePolicy batchReceivePolicy = 
BatchReceivePolicy.builder().maxNumBytes(maxBytes)
+                .maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, 
TimeUnit.SECONDS).build();
+        @Cleanup
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName("my-subscriber-name")
+                .receiverQueueSize(internalQueueSize)
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+
+        final int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("value-" + 
i).getBytes(UTF_8)).eventTime((i + 1) * 100L).send();
+        }
+
+        assertEquals(consumer.batchReceive().size(), maxBatchSize);
+    }
+
     private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl 
messageId2) {
         if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
             return -1;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f644c6a1839..7686d0072cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -120,6 +120,10 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must 
set with KeyShared subscription"));
         }
+        if (conf.getBatchReceivePolicy() != null) {
+            conf.setReceiverQueueSize(
+                    Math.max(conf.getBatchReceivePolicy().getMaxNumMessages(), 
conf.getReceiverQueueSize()));
+        }
         CompletableFuture<Void> applyDLQConfig;
         if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
             TopicName topicFirst = 
TopicName.get(conf.getTopicNames().iterator().next());

Reply via email to