3pacccccc opened a new issue, #25107: URL: https://github.com/apache/pulsar/issues/25107
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment master branch ### Issue Description When using Pulsar transactions with `transaction.abort()`, I observed duplicate message consumption in certain scenarios. After aborting a transaction that contains acknowledged messages, those messages can be redelivered and consumed again, leading to duplicate processing. ### Error messages ```text no error message ``` ### Reproducing the issue Use the following code to reproduce the issue in `TransactionConsumeTest` ```java @Test public void duplicateMessageConsume() throws Exception { final String topic = "persistent://public/txn/duplicateMessageConsume"; @Cleanup PulsarClient pulsarClient1 = PulsarClient.builder() .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) .enableTransaction(true) .build(); @Cleanup Producer<String> producer = pulsarClient1 .newProducer(Schema.STRING) .topic(topic) .enableBatching(true) .batchingMaxMessages(5) .create(); Consumer<String> consumer = pulsarClient1 .newConsumer(Schema.STRING) .subscriptionType(SubscriptionType.Shared) .topic(topic) .subscriptionName("mySub") .subscribe(); Transaction transaction = pulsarClient1.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() .get(); // Send 10 messages for (int j = 0; j < 10; j++) { producer.newMessage() .value(("value-" + j)) .sendAsync(); } // Consume messages for (int j = 0; j < 20; j++) { Message<String> receive = consumer.receive(2, TimeUnit.SECONDS); if (receive == null) { break; } if (j == 0) { // Acknowledge first message in transaction, then abort the transaction consumer.acknowledgeAsync(receive.getMessageId(), transaction).get(); System.out.println("receive1: " + new String(receive1.getData()) + ", msgId: " + receive1.getMessageId()); transaction.abort().get(); // sleep 2s to ensure dispath message before ack. Thread.sleep(2000); } else { // Acknowledge other messages normally consumer.acknowledge(receive1.getMessageId()); System.out.println("receive: " + new String(receive.getData()) + ", msgId: " + receive.getMessageId()); } } } ``` and you can see the console output: ``` receive1: value-0, msgId: 12:0:-1:0 receive2: value-1, msgId: 12:0:-1:1 receive2: value-2, msgId: 12:0:-1:2 receive2: value-3, msgId: 12:0:-1:3 receive2: value-4, msgId: 12:0:-1:4 receive2: value-5, msgId: 12:1:-1:0 receive2: value-6, msgId: 12:1:-1:1 receive2: value-7, msgId: 12:1:-1:2 receive2: value-8, msgId: 12:1:-1:3 receive2: value-9, msgId: 12:1:-1:4 receive2: value-0, msgId: 12:0:-1:0 receive2: value-1, msgId: 12:0:-1:1 # Duplicate! receive2: value-2, msgId: 12:0:-1:2 # Duplicate! receive2: value-3, msgId: 12:0:-1:3 # Duplicate! receive2: value-4, msgId: 12:0:-1:4 # Duplicate! ``` ### Additional information _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
