This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 2d83d0a9339 [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs
value when maxUnackedMessagesPerConsumer is 1 (#23796)
2d83d0a9339 is described below
commit 2d83d0a93391c4536560c9535ae6da24680e93c2
Author: Jiawen Wang <[email protected]>
AuthorDate: Fri Feb 14 17:56:45 2025 +0800
[fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when
maxUnackedMessagesPerConsumer is 1 (#23796)
(cherry picked from commit 5443c69d84818cb4a49704f7ab7dbccf65b2179a)
---
.../org/apache/pulsar/broker/service/Consumer.java | 7 +++-
.../pulsar/broker/service/BrokerServiceTest.java | 41 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 9cf50165cd4..dd7b4f2e40c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -560,6 +560,7 @@ public class Consumer {
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize,
position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer,
position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+ updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
}
@@ -1036,6 +1037,11 @@ public class Consumer {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName,
subscription, consumerId, position);
}
+ updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
+ return true;
+ }
+
+ public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
// unblock consumer-throttling when limit check is disabled or
receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
@@ -1045,7 +1051,6 @@ public class Consumer {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
- return true;
}
public ConcurrentLongLongPairHashMap getPendingAcks() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index bd53ff31ec3..98524714dab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1791,6 +1791,47 @@ public class BrokerServiceTest extends BrokerTestBase {
.get("sub-1").getUnackedMessages(), 0);
}
+ @Test
+ public void testBlockedConsumerOnUnackedMsgs() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1);
+
+ final String topicName =
"persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-test")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(true)
+ .receiverQueueSize(0)
+ .subscribe();
+
+ producer.send("1".getBytes(StandardCharsets.UTF_8));
+ producer.send("2".getBytes(StandardCharsets.UTF_8));
+
+ // 1. receive message
+ Message<byte[]> message = consumer.receive();
+ Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+ SubscriptionStats subscriptionStats =
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+ assertEquals(subscriptionStats.getUnackedMessages(), 1);
+
assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+
+ // 2、ack this message
+ consumer.acknowledge(message);
+ Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+ subscriptionStats =
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+ assertEquals(subscriptionStats.getUnackedMessages(), 0);
+
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+ }
+
@Test
public void testUnsubscribeNonDurableSub() throws Exception {
final String ns = "prop/ns-test";