This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 29ee145f53873a3a9b7f087c3c2c97749f3ca695 Author: Rajan Dhabalia <[email protected]> AuthorDate: Thu May 2 16:57:49 2024 -0700 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) (cherry picked from commit 0219921b5b7cd157092ac8f2d86ab7e60787d36c) --- .../client/api/ConsumerBatchReceiveTest.java | 8 +++--- .../client/api/SimpleProducerConsumerTest.java | 29 ++++++++++++++++++++++ .../pulsar/client/impl/ConsumerBuilderImpl.java | 4 +++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index d54b1c99e3e..974d25aad64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() - .maxNumMessages(70) + .maxNumMessages(50) .build(), true, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() - .maxNumMessages(70) + .maxNumMessages(50) .build(), false, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() - .maxNumMessages(70) + .maxNumMessages(50) .build(), true, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() - .maxNumMessages(70) + .maxNumMessages(50) .build(), false, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 68158dd69a4..7877d5ab604 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { admin.topics().delete(topic, false); } + /** + * It verifies that consumer receives configured number of messages into the batch. + * @throws Exception + */ + @Test + public void testBatchReceiveWithMaxBatchSize() throws Exception { + int maxBatchSize = 100; + final int internalQueueSize = 10; + final int maxBytes = 2000000; + final int timeOutInSeconds = 900; + final String topic = "persistent://my-property/my-ns/testBatchReceive"; + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(maxBytes) + .maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, TimeUnit.SECONDS).build(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName("my-subscriber-name") + .receiverQueueSize(internalQueueSize) + .batchReceivePolicy(batchReceivePolicy).subscribe(); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + final int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).send(); + } + + assertEquals(consumer.batchReceive().size(), maxBatchSize); + } + private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2) { if (messageId2.getLedgerId() < messageId1.getLedgerId()) { return -1; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f644c6a1839..7686d0072cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -120,6 +120,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { return FutureUtil.failedFuture( new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription")); } + if (conf.getBatchReceivePolicy() != null) { + conf.setReceiverQueueSize( + Math.max(conf.getBatchReceivePolicy().getMaxNumMessages(), conf.getReceiverQueueSize())); + } CompletableFuture<Void> applyDLQConfig; if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) { TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
