This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6e592087999 [fix][broker] Fix incorrect unack msk count when dup ack a 
message (#20990)
6e592087999 is described below

commit 6e5920879996de4f43dccb4809a910b227f931f5
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Aug 18 22:54:44 2023 +0800

    [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 24 +++++++++++++-------
 .../pulsar/broker/service/BrokerServiceTest.java   | 26 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 26dc27a0c02..71a5bc429d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -491,6 +491,7 @@ public class Consumer {
     private CompletableFuture<Long> individualAckNormal(CommandAck ack, 
Map<String, Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
         long totalAckCount = 0;
+        boolean individualAck = false;
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -514,14 +515,18 @@ public class Consumer {
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
                 ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
+                individualAck = true;
             }
 
-            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
+            if (individualAck) {
+                if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
+                    addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+                }
+            } else {
+                addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+            }
             positionsAcked.add(position);
 
-            checkCanRemovePendingAcksAndHandle(position, msgId);
-
             checkAckValidationError(ack, position);
 
             totalAckCount += ackedCount;
@@ -683,10 +688,11 @@ public class Consumer {
         }
     }
 
-    private void checkCanRemovePendingAcksAndHandle(PositionImpl position, 
MessageIdData msgId) {
+    private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, 
MessageIdData msgId) {
         if (Subscription.isIndividualAckMode(subType) && 
msgId.getAckSetsCount() == 0) {
-            removePendingAcks(position);
+            return removePendingAcks(position);
         }
+        return false;
     }
 
     private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
@@ -953,7 +959,7 @@ public class Consumer {
      *
      * @param position
      */
-    private void removePendingAcks(PositionImpl position) {
+    private boolean removePendingAcks(PositionImpl position) {
         Consumer ackOwnedConsumer = null;
         if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == 
null) {
             for (Consumer consumer : subscription.getConsumers()) {
@@ -974,7 +980,7 @@ public class Consumer {
         if (ackedPosition != null) {
             if 
(!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), 
position.getEntryId())) {
                 // Message was already removed by the other consumer
-                return;
+                return false;
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] consumer {} received ack {}", topicName, 
subscription, consumerId, position);
@@ -988,7 +994,9 @@ public class Consumer {
                 ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
                 flowConsumerBlockedPermits(ackOwnedConsumer);
             }
+            return true;
         }
+        return false;
     }
 
     public ConcurrentLongLongPairHashMap getPendingAcks() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1eb02b71972..9012c6edb2f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1597,4 +1597,30 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertTrue(brokerService.isAllowAutoTopicCreationAsync(
                 "persistent://pulsar/system/my-system-topic").get());
     }
+
+    @Test
+    public void testDuplicateAcknowledgement() throws Exception {
+        final String ns = "prop/ns-test";
+
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/duplicated-acknowledgement-test";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("sub-1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        producer.send("1".getBytes(StandardCharsets.UTF_8));
+        Message<byte[]> message = consumer1.receive();
+        consumer1.acknowledge(message);
+        consumer1.acknowledge(message);
+        assertEquals(admin.topics().getStats(topicName).getSubscriptions()
+                .get("sub-1").getUnackedMessages(), 0);
+    }
 }

Reply via email to