This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f30f348ccf2d197d48ae6caefeff07dff1ff87e5
Author: Penghui Li <peng...@apache.org>
AuthorDate: Wed Jul 20 10:51:35 2022 +0800

    [fix][broker] Fix consumer does not abide by the max unacks limitation for 
Shared subscription (#16670)
    
    (cherry picked from commit 42fe0603518be7db7a14802eb4274b6ea22b0c9a)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++
 .../client/api/SimpleProducerConsumerTest.java     | 40 ++++++++++++++++++++++
 .../client/api/v1/V1_ProducerConsumerTest.java     |  2 +-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 03cb72e3e61..3f33995c6a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -526,6 +526,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
             // round-robin dispatch batch size for this consumer
             int availablePermits = c.isWritable() ? c.getAvailablePermits() : 
1;
+            if (c.getMaxUnackedMessages() > 0) {
+                availablePermits = Math.min(availablePermits, 
c.getMaxUnackedMessages() - c.getUnackedMessages());
+            }
             if (log.isDebugEnabled() && !c.isWritable()) {
                 log.debug("[{}-{}] consumer is not writable. dispatching only 
1 message to {}; "
                                 + "availablePermits are {}", topic.getName(), 
name,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 24302a16f8e..57f82136636 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1596,6 +1596,46 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testMaxUnAckMessagesLowerThanPermits(boolean 
ackReceiptEnabled) throws PulsarClientException {
+        final int maxUnacks = 10;
+        pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
+        final String topic = 
"persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(ackReceiptEnabled)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic)
+                .create();
+
+        final int messages = 1000;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Message - " + i);
+        }
+        producer.flush();
+        List<MessageId> receives = new ArrayList<>();
+        for (int i = 0; i < maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", 
received.getValue(), received.getMessageId());
+            receives.add(received.getMessageId());
+        }
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+        consumer.acknowledge(receives);
+        for (int i = 0; i < messages - maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", 
received.getValue(), received.getMessageId());
+            consumer.acknowledge(received);
+        }
+    }
+
     /**
      * Verify: Consumer1 which doesn't send ack will not impact Consumer2 
which sends ack for consumed message.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 55c120592e1..e4cb941c650 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -1708,7 +1708,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             }
 
             // client should not receive all produced messages and should be 
blocked due to unack-messages
-            assertEquals(messages1.size(), receiverQueueSize);
+            assertEquals(messages1.size(), unAckedMessagesBufferSize);
             Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m 
-> {
                 return (MessageIdImpl) m.getMessageId();
             }).collect(Collectors.toSet());

Reply via email to