This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4a674c2283c [fix][broker] fix broker unackmessages number reduce error 
(#17003)
4a674c2283c is described below

commit 4a674c2283cbe265ab3dde8ff366b091b845717c
Author: congbo <[email protected]>
AuthorDate: Sat Aug 13 10:29:26 2022 +0800

    [fix][broker] fix broker unackmessages number reduce error (#17003)
    
    (cherry picked from commit 5262e6c8b4d2a98ac7f73a94a30f001630b2be28)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 13 ++--
 .../BatchMessageWithBatchIndexLevelTest.java       | 82 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 52a91c5ef3e..4ff0a48ba1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -423,7 +423,7 @@ public class Consumer {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
-                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, ackSets);
+                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, ackSets, ackOwnerConsumer);
                 if (isTransactionEnabled()) {
                     //sync the batch position bit set point, in order to 
delete the position in pending acks
                     if (Subscription.isIndividualAckMode(subType)) {
@@ -433,7 +433,7 @@ public class Consumer {
                 }
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
-                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position);
+                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
             }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
@@ -541,20 +541,21 @@ public class Consumer {
         return batchSize;
     }
 
-    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl 
position) {
+    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl 
position, Consumer consumer) {
         if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
-                return getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, EMPTY_ACK_SET);
+                return getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, EMPTY_ACK_SET, consumer);
             }
         }
         return batchSize;
     }
 
-    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets) {
+    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets,
+                                                        Consumer consumer) {
         long ackedCount = 0;
         if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)
-            && pendingAcks.get(position.getLedgerId(), position.getEntryId()) 
!= null) {
+            && consumer.getPendingAcks().get(position.getLedgerId(), 
position.getEntryId()) != null) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
                 BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index b953772ad67..d5c4e1eb064 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -33,11 +33,13 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 
 @Test(groups = "broker")
 public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@@ -195,4 +197,84 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
             assertEquals(unackedMessages, 10);
         });
     }
+
+    @Test
+    public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws 
Exception {
+        final String subName = "test";
+        final String topicName = 
"persistent://prop/ns-abc/testAckMessageWithNotOwnerConsumerUnAckMessageCount-"
+                + UUID.randomUUID();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topicName)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .enableBatching(true)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .consumerName("consumer-1")
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .consumerName("consumer-2")
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value(("Hello Pulsar - " + 
i).getBytes()).sendAsync();
+        }
+
+        // consume-1 receive 5 batch messages
+        List<MessageId> list = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            list.add(consumer1.receive().getMessageId());
+        }
+
+        // consumer-1 redeliver the batch messages
+        consumer1.negativeAcknowledge(list.get(0));
+
+        // consumer-2 will receive the messages that the consumer-1 redelivered
+        for (int i = 0; i < 5; i++) {
+            consumer2.receive().getMessageId();
+        }
+
+        // consumer1 ack two messages in the batch message
+        consumer1.acknowledge(list.get(1));
+        consumer1.acknowledge(list.get(2));
+
+        // consumer-2 redeliver the rest of the messages
+        consumer2.negativeAcknowledge(list.get(1));
+
+        // consume-1 close will redeliver the rest messages to consumer-2
+        consumer1.close();
+
+        // consumer-2 can receive the rest of 3 messages
+        for (int i = 0; i < 3; i++) {
+            consumer2.acknowledge(consumer2.receive().getMessageId());
+        }
+
+        // consumer-2 can't receive any messages, all the messages in batch 
has been acked
+        Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
+        assertNull(message);
+
+        // the number of consumer-2's unacked messages is 0
+        Awaitility.await().until(() -> 
getPulsar().getBrokerService().getTopic(topicName, false)
+                
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages()
 == 0);
+    }
 }

Reply via email to