This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 40579c5ab328c3e9c41a7e49a1c29271ddb61d89
Author: congbo <[email protected]>
AuthorDate: Sun Jan 30 01:08:11 2022 +0800

    Fix NPE of cumulative ack mode and incorrect unack message count (#14021)
    
    link https://github.com/apache/pulsar/pull/13383
    ## Motivation
    #13383 has fixed  the batch message ack does not decrease the unacked-msg 
count, but in cumulative ack mode also decrease, it will use pendingAcks, but 
in cumulative ack, this will not init.
    
    
![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png)
    
![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png)
    
    If ack the batch index one by one, the last ack of a batch will decrease 
unack message with `batchSize`
    ```
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 1
    ================ batch size -> 10
    ================ message id -> 3:1
    ================ acked count -> 9
    ================ batch size -> 10
    ```
    
    ### Modifications
    add judge `Subscription.isIndividualAckMode(subType)` when get ackCount.
    If the ack from consumer don't have ackset, we should treat it as empty 
ackset to calculate the ack count with the currently ackset.
    
    (cherry picked from commit 618f17c155a8e28a99bbcc5e26f9ec3a6c7d9b08)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  7 ++-
 .../BatchMessageWithBatchIndexLevelTest.java       | 68 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index cf9f5bf..678bdcf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -122,6 +122,7 @@ public class Consumer {
     private static final AtomicIntegerFieldUpdater<Consumer> 
AVG_MESSAGES_PER_ENTRY =
             AtomicIntegerFieldUpdater.newUpdater(Consumer.class, 
"avgMessagesPerEntry");
     private volatile int avgMessagesPerEntry = 1000;
+    private static final long [] EMPTY_ACK_SET = new long[0];
 
     private static final double avgPercent = 0.9;
     private boolean preciseDispatcherFlowControl;
@@ -413,10 +414,10 @@ public class Consumer {
                 }
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
-                if (isAcknowledgmentAtBatchIndexLevelEnabled) {
+                if (Subscription.isIndividualAckMode(subType) && 
isAcknowledgmentAtBatchIndexLevelEnabled) {
                     long[] cursorAckSet = getCursorAckSet(position);
                     if (cursorAckSet != null) {
-                        ackedCount = batchSize - 
BitSet.valueOf(cursorAckSet).cardinality();
+                        ackedCount = 
getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
                     } else {
                         ackedCount = batchSize;
                     }
@@ -521,7 +522,7 @@ public class Consumer {
 
     private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets) {
         long ackedCount = 0;
-        if (isAcknowledgmentAtBatchIndexLevelEnabled) {
+        if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
                 BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 20ba4f1..5e09def 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -19,21 +19,27 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.Lists;
+import lombok.Cleanup;
 import lombok.SneakyThrows;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import static org.testng.Assert.assertEquals;
 
@@ -105,4 +111,66 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
             
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
         });
     }
+
+    @DataProvider(name = "testSubTypeAndEnableBatch")
+    public Object[][] testSubTypeAndEnableBatch() {
+        return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
+                { SubscriptionType.Failover, Boolean.TRUE },
+                { SubscriptionType.Shared, Boolean.FALSE },
+                { SubscriptionType.Failover, Boolean.FALSE }};
+    }
+
+
+    @Test(dataProvider="testSubTypeAndEnableBatch")
+    private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType 
subType,
+                                                             boolean 
enableBatch) throws Exception {
+
+        final int messageCount = 50;
+        final String topicName = 
"persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
+        final String subscriptionName = "sub-batch-1";
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
+                .newConsumer(Schema.BYTES)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(subType)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .enableBatching(enableBatch)
+                .topic(topicName)
+                .batchingMaxMessages(10)
+                .create();
+
+        CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+        for (int i = 0; i < messageCount; i++) {
+            producer.sendAsync((i + 
"").getBytes()).thenRun(countDownLatch::countDown);
+        }
+
+        countDownLatch.await();
+
+        for (int i = 0; i < messageCount; i++) {
+            Message<byte[]> message = consumer.receive();
+            // wait for receipt
+            if (i < messageCount / 2) {
+                consumer.acknowledgeAsync(message.getMessageId()).get();
+            }
+        }
+
+        String topic = TopicName.get(topicName).toString();
+        PersistentSubscription persistentSubscription =  
(PersistentSubscription) pulsar.getBrokerService()
+                .getTopic(topic, 
false).get().get().getSubscription(subscriptionName);
+
+        Awaitility.await().untilAsserted(() -> {
+            if (subType == SubscriptionType.Shared) {
+                
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
messageCount / 2);
+            } else {
+                
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
0);
+            }
+        });
+    }
 }

Reply via email to