This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5443c69d848 [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs 
value when maxUnackedMessagesPerConsumer is 1 (#23796)
5443c69d848 is described below

commit 5443c69d84818cb4a49704f7ab7dbccf65b2179a
Author: Jiawen Wang <[email protected]>
AuthorDate: Fri Feb 14 17:56:45 2025 +0800

    [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when 
maxUnackedMessagesPerConsumer is 1 (#23796)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  7 +++-
 .../pulsar/broker/service/BrokerServiceTest.java   | 41 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index b46e10a20fd..61f9d5c86b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -595,6 +595,7 @@ public class Consumer {
                 ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
                 if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, 
position, msgId)) {
                     addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+                    updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
                 }
             }
 
@@ -1081,6 +1082,11 @@ public class Consumer {
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] consumer {} received ack {}", topicName, 
subscription, consumerId, position);
         }
+        updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
+        return true;
+    }
+
+    public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
         // unblock consumer-throttling when limit check is disabled or 
receives half of maxUnackedMessages =>
         // consumer can start again consuming messages
         int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
@@ -1090,7 +1096,6 @@ public class Consumer {
             ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
             flowConsumerBlockedPermits(ackOwnedConsumer);
         }
-        return true;
     }
 
     public PendingAcksMap getPendingAcks() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 89727014be9..fa76fdd5bf4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1790,6 +1790,47 @@ public class BrokerServiceTest extends BrokerTestBase {
                 .get("sub-1").getUnackedMessages(), 0);
     }
 
+    @Test
+    public void testBlockedConsumerOnUnackedMsgs() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1);
+
+        final String topicName = 
"persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("sub-test")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true)
+                .receiverQueueSize(0)
+                .subscribe();
+
+        producer.send("1".getBytes(StandardCharsets.UTF_8));
+        producer.send("2".getBytes(StandardCharsets.UTF_8));
+
+        // 1. receive message
+        Message<byte[]> message = consumer.receive();
+        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+        SubscriptionStats subscriptionStats = 
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+        assertEquals(subscriptionStats.getUnackedMessages(), 1);
+        
assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+
+        // 2、ack this message
+        consumer.acknowledge(message);
+        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+        subscriptionStats = 
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+        assertEquals(subscriptionStats.getUnackedMessages(), 0);
+        
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+    }
+
     @Test
     public void testUnsubscribeNonDurableSub() throws Exception {
         final String ns = "prop/ns-test";

Reply via email to