This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 735f700f93a529a0b150057bbf46d72c93536d46 Author: congbo <[email protected]> AuthorDate: Sun Jan 30 11:09:23 2022 +0800 [Transaction] Fix individual ack with transaction decrease unAckMessageCounnt (#14020) link https://github.com/apache/pulsar/pull/13383 ## Motivation #13383 has fixed the batch message ack does not decrease the unacked-msg count, but ack with transaction don't fix because decrease unAckMessageCount move to another method. ack with transaction can't decrease unackMessageCount. (cherry picked from commit 1e2ff8a3941b7cc6d583f528ceedc393b7e607fb) --- .../org/apache/pulsar/broker/service/Consumer.java | 34 +++++++---- .../client/impl/TransactionEndToEndTest.java | 70 +++++++++++++++++++--- 2 files changed, 83 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index d3fa495..630caac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -414,16 +414,7 @@ public class Consumer { } } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); - } else { - ackedCount = batchSize; - } - } else { - ackedCount = batchSize; - } + ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position); } addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); @@ -465,14 +456,19 @@ public class Consumer { for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; + long ackedCount = 0; + long batchSize = getBatchSize(msgId); + Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); if (msgId.getAckSetsCount() > 0) { - long[] acksSets = new long[msgId.getAckSetsCount()]; + long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { - acksSets[j] = msgId.getAckSetAt(j); + ackSets[j] = msgId.getAckSetAt(j); } - position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), acksSets); + position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); + ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets); } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position); } if (msgId.hasBatchIndex()) { @@ -481,6 +477,8 @@ public class Consumer { positionsAcked.add(new MutablePair<>(position, 0)); } + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + checkCanRemovePendingAcksAndHandle(position, msgId); checkAckValidationError(ack, position); @@ -520,6 +518,16 @@ public class Consumer { return batchSize; } + private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) { + if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); + } + } + return batchSize; + } + private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { long ackedCount = 0; if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index f52d319..1f2bd06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -97,16 +98,12 @@ public class TransactionEndToEndTest extends TransactionTestBase { super.internalCleanup(); } - @Test - public void noBatchProduceCommitTest() throws Exception { - produceCommitTest(false); - } - - @Test - public void batchProduceCommitTest() throws Exception { - produceCommitTest(true); + @DataProvider(name = "enableBatch") + public Object[][] enableBatch() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } + @Test(dataProvider="enableBatch") private void produceCommitTest(boolean enableBatch) throws Exception { @Cleanup Consumer<byte[]> consumer = pulsarClient @@ -249,6 +246,63 @@ public class TransactionEndToEndTest extends TransactionTestBase { log.info("finished test partitionAbortTest"); } + @Test(dataProvider="enableBatch") + private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) throws Exception { + + final int messageCount = 50; + final String subName = "testAckWithTransactionReduceUnAckMessageCount"; + final String topicName = NAMESPACE1 + "/testAckWithTransactionReduceUnAckMessageCount-" + enableBatch; + @Cleanup + Consumer<byte[]> consumer = pulsarClient + .newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .subscribe(); + Awaitility.await().until(consumer::isConnected); + + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topicName) + .enableBatching(enableBatch) + .batchingMaxMessages(10) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + } + + countDownLatch.await(); + + Transaction txn = getTxn(); + + for (int i = 0; i < messageCount / 2; i++) { + Message<byte[]> message = consumer.receive(); + consumer.acknowledgeAsync(message.getMessageId(), txn).get(); + } + + txn.commit().get(); + boolean flag = false; + String topic = TopicName.get(topicName).toString(); + for (int i = 0; i < getPulsarServiceList().size(); i++) { + CompletableFuture<Optional<Topic>> topicFuture = getPulsarServiceList().get(i) + .getBrokerService().getTopic(topic, false); + + if (topicFuture != null) { + Optional<Topic> topicOptional = topicFuture.get(); + if (topicOptional.isPresent()) { + PersistentSubscription persistentSubscription = + (PersistentSubscription) topicOptional.get().getSubscription(subName); + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + flag = true; + } + } + } + assertTrue(flag); + } + @Test public void txnIndividualAckTestNoBatchAndSharedSub() throws Exception { txnAckTest(false, 1, SubscriptionType.Shared);
