This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f30f348ccf2d197d48ae6caefeff07dff1ff87e5 Author: Penghui Li <peng...@apache.org> AuthorDate: Wed Jul 20 10:51:35 2022 +0800 [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670) (cherry picked from commit 42fe0603518be7db7a14802eb4274b6ea22b0c9a) --- .../PersistentDispatcherMultipleConsumers.java | 3 ++ .../client/api/SimpleProducerConsumerTest.java | 40 ++++++++++++++++++++++ .../client/api/v1/V1_ProducerConsumerTest.java | 2 +- 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 03cb72e3e61..3f33995c6a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -526,6 +526,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul // round-robin dispatch batch size for this consumer int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1; + if (c.getMaxUnackedMessages() > 0) { + availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages()); + } if (log.isDebugEnabled() && !c.isWritable()) { log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; " + "availablePermits are {}", topic.getName(), name, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 24302a16f8e..57f82136636 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1596,6 +1596,46 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } } + @Test(dataProvider = "ackReceiptEnabled") + public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException { + final int maxUnacks = 10; + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks); + final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits"; + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(ackReceiptEnabled) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false) + .topic(topic) + .create(); + + final int messages = 1000; + for (int i = 0; i < messages; i++) { + producer.sendAsync("Message - " + i); + } + producer.flush(); + List<MessageId> receives = new ArrayList<>(); + for (int i = 0; i < maxUnacks; i++) { + Message<String> received = consumer.receive(); + log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId()); + receives.add(received.getMessageId()); + } + assertNull(consumer.receive(3, TimeUnit.SECONDS)); + consumer.acknowledge(receives); + for (int i = 0; i < messages - maxUnacks; i++) { + Message<String> received = consumer.receive(); + log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId()); + consumer.acknowledge(received); + } + } + /** * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 55c120592e1..e4cb941c650 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -1708,7 +1708,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { } // client should not receive all produced messages and should be blocked due to unack-messages - assertEquals(messages1.size(), receiverQueueSize); + assertEquals(messages1.size(), unAckedMessagesBufferSize); Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> { return (MessageIdImpl) m.getMessageId(); }).collect(Collectors.toSet());