This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 735f700f93a529a0b150057bbf46d72c93536d46
Author: congbo <[email protected]>
AuthorDate: Sun Jan 30 11:09:23 2022 +0800

    [Transaction] Fix individual ack with transaction decrease 
unAckMessageCounnt (#14020)
    
    link https://github.com/apache/pulsar/pull/13383
    ## Motivation
    #13383 has fixed  the batch message ack does not decrease the unacked-msg 
count, but ack with transaction don't fix
    because decrease unAckMessageCount move to another method. ack with 
transaction can't decrease unackMessageCount.
    
    (cherry picked from commit 1e2ff8a3941b7cc6d583f528ceedc393b7e607fb)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 34 +++++++----
 .../client/impl/TransactionEndToEndTest.java       | 70 +++++++++++++++++++---
 2 files changed, 83 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d3fa495..630caac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -414,16 +414,7 @@ public class Consumer {
                 }
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
-                if (Subscription.isIndividualAckMode(subType) && 
isAcknowledgmentAtBatchIndexLevelEnabled) {
-                    long[] cursorAckSet = getCursorAckSet(position);
-                    if (cursorAckSet != null) {
-                        ackedCount = 
getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
-                    } else {
-                        ackedCount = batchSize;
-                    }
-                } else {
-                    ackedCount = batchSize;
-                }
+                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position);
             }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
@@ -465,14 +456,19 @@ public class Consumer {
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
+            long ackedCount = 0;
+            long batchSize = getBatchSize(msgId);
+            Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
             if (msgId.getAckSetsCount() > 0) {
-                long[] acksSets = new long[msgId.getAckSetsCount()];
+                long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
-                    acksSets[j] = msgId.getAckSetAt(j);
+                    ackSets[j] = msgId.getAckSetAt(j);
                 }
-                position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), acksSets);
+                position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
+                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, ackSets);
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position);
             }
 
             if (msgId.hasBatchIndex()) {
@@ -481,6 +477,8 @@ public class Consumer {
                 positionsAcked.add(new MutablePair<>(position, 0));
             }
 
+            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+
             checkCanRemovePendingAcksAndHandle(position, msgId);
 
             checkAckValidationError(ack, position);
@@ -520,6 +518,16 @@ public class Consumer {
         return batchSize;
     }
 
+    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl 
position) {
+        if (Subscription.isIndividualAckMode(subType) && 
isAcknowledgmentAtBatchIndexLevelEnabled) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                return getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, EMPTY_ACK_SET);
+            }
+        }
+        return batchSize;
+    }
+
     private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets) {
         long ackedCount = 0;
         if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index f52d319..1f2bd06 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -97,16 +98,12 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void noBatchProduceCommitTest() throws Exception {
-        produceCommitTest(false);
-    }
-
-    @Test
-    public void batchProduceCommitTest() throws Exception {
-        produceCommitTest(true);
+    @DataProvider(name = "enableBatch")
+    public Object[][] enableBatch() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
+    @Test(dataProvider="enableBatch")
     private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup
         Consumer<byte[]> consumer = pulsarClient
@@ -249,6 +246,63 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         log.info("finished test partitionAbortTest");
     }
 
+    @Test(dataProvider="enableBatch")
+    private void testAckWithTransactionReduceUnAckMessageCount(boolean 
enableBatch) throws Exception {
+
+        final int messageCount = 50;
+        final String subName = "testAckWithTransactionReduceUnAckMessageCount";
+        final String topicName = NAMESPACE1 + 
"/testAckWithTransactionReduceUnAckMessageCount-" + enableBatch;
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        Awaitility.await().until(consumer::isConnected);
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topicName)
+                .enableBatching(enableBatch)
+                .batchingMaxMessages(10)
+                .create();
+
+        CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+        for (int i = 0; i < messageCount; i++) {
+            producer.sendAsync((i + 
"").getBytes()).thenRun(countDownLatch::countDown);
+        }
+
+        countDownLatch.await();
+
+        Transaction txn = getTxn();
+
+        for (int i = 0; i < messageCount / 2; i++) {
+            Message<byte[]> message = consumer.receive();
+            consumer.acknowledgeAsync(message.getMessageId(), txn).get();
+        }
+
+        txn.commit().get();
+        boolean flag = false;
+        String topic = TopicName.get(topicName).toString();
+        for (int i = 0; i < getPulsarServiceList().size(); i++) {
+            CompletableFuture<Optional<Topic>> topicFuture = 
getPulsarServiceList().get(i)
+                    .getBrokerService().getTopic(topic, false);
+
+            if (topicFuture != null) {
+                Optional<Topic> topicOptional = topicFuture.get();
+                if (topicOptional.isPresent()) {
+                    PersistentSubscription persistentSubscription =
+                            (PersistentSubscription) 
topicOptional.get().getSubscription(subName);
+                    
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
messageCount / 2);
+                    flag = true;
+                }
+            }
+        }
+        assertTrue(flag);
+    }
+
     @Test
     public void txnIndividualAckTestNoBatchAndSharedSub() throws Exception {
         txnAckTest(false, 1, SubscriptionType.Shared);

Reply via email to