This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 91dac3e244e1342110b4199ae38a819501d3df0d Author: congbo <[email protected]> AuthorDate: Thu Sep 1 12:29:04 2022 +0800 [fix][txn] fix ack with txn compute ackedCount error (#17016) Co-authored-by: congbobo184 <[email protected]> (cherry picked from commit 176b0d6e9e0d647c611cfdd359e5088ccb58788c) --- .../org/apache/pulsar/broker/service/Consumer.java | 25 ++++++------ .../client/impl/TransactionEndToEndTest.java | 46 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 25dd3b908e5..1659d25c74e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -480,25 +480,28 @@ public class Consumer { LongAdder totalAckCount = new LongAdder(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); - PositionImpl position; + PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + // acked count at least one long ackedCount = 0; - long batchSize = getBatchSize(msgId); + long batchSize = 0; + if (msgId.hasBatchSize()) { + batchSize = msgId.getBatchSize(); + // ack batch messages set ackeCount = batchSize + ackedCount = msgId.getBatchSize(); + positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); + } else { + // ack no batch message set ackedCount = 1 + ackedCount = 1; + positionsAcked.add(new MutablePair<>(position, (int) batchSize)); + } Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { ackSets[j] = msgId.getAckSetAt(j); } - position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); + position.setAckSet(ackSets); ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); - } else { - position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = batchSize; - } - if (msgId.hasBatchSize()) { - positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); - } else { - positionsAcked.add(new MutablePair<>(position, (int) batchSize)); } addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 2d174b9b1fb..c7ae3a53109 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1110,6 +1110,52 @@ public class TransactionEndToEndTest extends TransactionTestBase { } } + @Test + public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception { + final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks"; + final String subName = "test"; + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topic) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .sendTimeout(1, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscribe(); + + // send 5 messages with one batch + for (int i = 0; i < 5; i++) { + producer.sendAsync((i + "").getBytes(UTF_8)); + } + + List<MessageId> messageIds = new ArrayList<>(); + + // receive the batch messages add to a list + for (int i = 0; i < 5; i++) { + messageIds.add(consumer.receive().getMessageId()); + } + + MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); + + + // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck + getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) + .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks() + .remove(messageId.ledgerId, messageId.entryId); + + Transaction txn = getTxn(); + consumer.acknowledgeAsync(messageIds.get(1), txn).get(); + + // ack one message, the unack count is 4 + assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) + .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4); + } + @Test public void testSendTxnAckMessageToDLQ() throws Exception { String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
