This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit faac51937d2f9c95811e85446b3319f877dc9fd4 Author: Xiangying Meng <[email protected]> AuthorDate: Tue Jun 7 12:45:00 2022 +0800 [fix][txn]Fix transasction ack batch message (#15875) Fixes https://github.com/apache/pulsar/issues/15832 ### Motivation The transaction needs batch size to help determine whether the batch message is in the pending ack state. ### Modifications Returns the batch size of messageID directly. (cherry picked from commit f87b3708ae6f05a8d4d4d6cd0db1090724fbcf4b) --- .../org/apache/pulsar/broker/service/Consumer.java | 7 ++- .../pendingack/PendingAckPersistentTest.java | 71 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 031574975d1..f2a4677dbfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -478,8 +478,11 @@ public class Consumer { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); ackedCount = batchSize; } - - positionsAcked.add(new MutablePair<>(position, (int) batchSize)); + if (msgId.hasBatchSize()) { + positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); + } else { + positionsAcked.add(new MutablePair<>(position, (int) batchSize)); + } addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 6683be138da..bd22ff423a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -45,8 +45,10 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -574,4 +576,73 @@ public class PendingAckPersistentTest extends TransactionTestBase { assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID())); } + + @Test + public void testTransactionConflictExceptionWhenAckBatchMessage() throws Exception { + String topic = TopicName.get(TopicDomain.persistent.toString(), + NamespaceName.get(NAMESPACE1), "test").toString(); + + String subscriptionName = "my-subscription-batch"; + pulsarServiceList.get(0).getBrokerService() + .getManagedLedgerConfig(TopicName.get(topic)).get() + .setDeletionAtBatchIndexLevelEnabled(true); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(true) + .batchingMaxMessages(3) + // set batch max publish delay big enough to make sure entry has 3 messages + .batchingMaxPublishDelay(10, TimeUnit.SECONDS) + .topic(topic).create(); + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscriptionName) + .enableBatchIndexAcknowledgment(true) + .subscriptionType(SubscriptionType.Exclusive) + .isAckReceiptEnabled(true) + .topic(topic) + .subscribe(); + + List<MessageId> messageIds = new ArrayList<>(); + List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>(); + + List<String> messages = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String message = "my-message-" + i; + messages.add(message); + CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message); + futureMessageIds.add(messageIdCompletableFuture); + } + + for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) { + MessageId messageId = futureMessageId.get(); + messageIds.add(messageId); + } + + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.DAYS) + .build() + .get(); + + Message<String> message1 = consumer.receive(); + Message<String> message2 = consumer.receive(); + + BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId(); + consumer.acknowledgeAsync(messageId, transaction).get(); + + Transaction transaction2 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.DAYS) + .build() + .get(); + transaction.commit().get(); + + try { + consumer.acknowledgeAsync(messageId, transaction2).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + } + } + }
