This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 48dbf01f3e981856fa3b2129ae8d2963f494dc52
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Feb 15 21:39:10 2022 +0800

    Fix batch ack count is negtive issue. (#14288)
    
    ### Motivation
    
    As #13383 fixed the batch ack issue. we find that the unack-msg count could 
be negative(#14246). At first, we think it was the normal case caused by msg 
redelivery.  But after diving into the logic, we find it's a bug.
    
    The test is copy from #14246 :
    
    ```
    for (int i = 0; i < 50; i++) {
          Message<String> msg = consumer.receive();
          if (i % 2 == 0) {
               consumer.acknowledgeAsync(msg);
           } else {
                consumer.negativeAcknowledge(msg);
           }
    }
    ```
    When msg is `negativeAcknowledge`,  
Consumer#redeliverUnacknowledgedMessages will invoke:
    
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912
    
    When calculating `totalRedeliveryMessages`, it must check `pendingAcks` 
contains this message. and remove from `pendingAcks` after that.  (Dispatch 
messages will add messages to pendingAcks)
    So the above test may exist that when `negativeAcknowledge` first and then 
`acknowledgeAsync`.
    `acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease 
unack-msg in :
    
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
    It doesn't check `pendingAcks`. this is the root cause. Should move line 
556 to 545.
    
    (cherry picked from commit 6b828b41382e5a94f89d628aca38871ccff8df9d)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +-
 .../BatchMessageWithBatchIndexLevelTest.java       | 96 +++++++++++++++++++++-
 2 files changed, 96 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 00511f4..bfaa660 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -526,7 +526,8 @@ public class Consumer {
 
     private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets) {
         long ackedCount = 0;
-        if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
+        if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)
+            && pendingAcks.get(position.getLedgerId(), position.getEntryId()) 
!= null) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
                 BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
@@ -537,7 +538,7 @@ public class Consumer {
                 int currentCardinality = cursorBitSet.cardinality();
                 ackedCount = lastCardinality - currentCardinality;
                 cursorBitSet.recycle();
-            } else if (pendingAcks.get(position.getLedgerId(), 
position.getEntryId()) != null) {
+            } else {
                 ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality();
             }
         }
@@ -558,6 +559,7 @@ public class Consumer {
             if (cursorAckSet != null) {
                 BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
                 unAckedCount = cursorBitSet.cardinality();
+                cursorBitSet.recycle();
             }
         }
         return unAckedCount;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index c3785c7..b953772 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -19,24 +19,24 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.Lists;
+import lombok.Cleanup;
 import lombok.SneakyThrows;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import static org.testng.Assert.assertEquals;
 
 @Test(groups = "broker")
@@ -56,7 +56,8 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         final String topicName = "persistent://prop/ns-abc/batchMessageAck-" + 
UUID.randomUUID();
         final String subscriptionName = "sub-batch-1";
 
-        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
                 .newConsumer()
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
@@ -66,6 +67,7 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
                 .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
                 .subscribe();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(topicName)
@@ -107,4 +109,90 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
             
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
         });
     }
+
+    @Test
+    public void testBatchMessageMultiNegtiveAck() throws Exception{
+        final String topicName = 
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
+        final String subscriptionName = "sub-negtive-1";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(10)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient
+                .newProducer(Schema.STRING)
+                .topic(topicName)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(true)
+                .create();
+
+        final int N = 20;
+        for (int i = 0; i < N; i++) {
+            String value = "test-" + i;
+            producer.sendAsync(value);
+        }
+        producer.flush();
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            if (i % 2 == 0) {
+                consumer.acknowledgeAsync(msg);
+            } else {
+                consumer.negativeAcknowledge(msg);
+            }
+        }
+        Awaitility.await().untilAsserted(() -> {
+            long unackedMessages = 
admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
+                    .getUnackedMessages();
+            assertEquals(unackedMessages, 10);
+        });
+
+        // Test negtive ack with sleep
+        final String topicName2 = 
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID();
+        final String subscriptionName2 = "sub-negtive-2";
+        @Cleanup
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName2)
+                .subscriptionName(subscriptionName2)
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(10)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .subscribe();
+        @Cleanup
+        Producer<String> producer2 = pulsarClient
+                .newProducer(Schema.STRING)
+                .topic(topicName2)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(true)
+                .create();
+
+        for (int i = 0; i < N; i++) {
+            String value = "test-" + i;
+            producer2.sendAsync(value);
+        }
+        producer2.flush();
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer2.receive();
+            if (i % 2 == 0) {
+                consumer.acknowledgeAsync(msg);
+            } else {
+                consumer.negativeAcknowledge(msg);
+                Thread.sleep(100);
+            }
+        }
+        Awaitility.await().untilAsserted(() -> {
+            long unackedMessages = 
admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
+                    .getUnackedMessages();
+            assertEquals(unackedMessages, 10);
+        });
+    }
 }

Reply via email to