This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5443c69d848 [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs
value when maxUnackedMessagesPerConsumer is 1 (#23796)
5443c69d848 is described below
commit 5443c69d84818cb4a49704f7ab7dbccf65b2179a
Author: Jiawen Wang <[email protected]>
AuthorDate: Fri Feb 14 17:56:45 2025 +0800
[fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when
maxUnackedMessagesPerConsumer is 1 (#23796)
---
.../org/apache/pulsar/broker/service/Consumer.java | 7 +++-
.../pulsar/broker/service/BrokerServiceTest.java | 41 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index b46e10a20fd..61f9d5c86b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -595,6 +595,7 @@ public class Consumer {
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize,
position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer,
position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+ updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
}
@@ -1081,6 +1082,11 @@ public class Consumer {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName,
subscription, consumerId, position);
}
+ updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
+ return true;
+ }
+
+ public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
// unblock consumer-throttling when limit check is disabled or
receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
@@ -1090,7 +1096,6 @@ public class Consumer {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
- return true;
}
public PendingAcksMap getPendingAcks() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 89727014be9..fa76fdd5bf4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1790,6 +1790,47 @@ public class BrokerServiceTest extends BrokerTestBase {
.get("sub-1").getUnackedMessages(), 0);
}
+ @Test
+ public void testBlockedConsumerOnUnackedMsgs() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1);
+
+ final String topicName =
"persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-test")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(true)
+ .receiverQueueSize(0)
+ .subscribe();
+
+ producer.send("1".getBytes(StandardCharsets.UTF_8));
+ producer.send("2".getBytes(StandardCharsets.UTF_8));
+
+ // 1. receive message
+ Message<byte[]> message = consumer.receive();
+ Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+ SubscriptionStats subscriptionStats =
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+ assertEquals(subscriptionStats.getUnackedMessages(), 1);
+
assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+
+ // 2、ack this message
+ consumer.acknowledge(message);
+ Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+
+ subscriptionStats =
admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
+ assertEquals(subscriptionStats.getUnackedMessages(), 0);
+
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
+ }
+
@Test
public void testUnsubscribeNonDurableSub() throws Exception {
final String ns = "prop/ns-test";