3pacccccc opened a new issue, #25107:
URL: https://github.com/apache/pulsar/issues/25107

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   master branch
   
   ### Issue Description
   
   When using Pulsar transactions with `transaction.abort()`, I observed 
duplicate message consumption in certain scenarios. After aborting a 
transaction that contains acknowledged messages, those messages can be 
redelivered and consumed again, leading to duplicate processing.
   
   
   
   ### Error messages
   
   ```text
   no error message
   ```
   
   ### Reproducing the issue
   
   Use the following code to reproduce the issue in `TransactionConsumeTest`
   ```java
   @Test
   public void duplicateMessageConsume() throws Exception {
       final String topic = "persistent://public/txn/duplicateMessageConsume";
       
       @Cleanup
       PulsarClient pulsarClient1 = PulsarClient.builder()
               .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
               .enableTransaction(true)
               .build();
       
       @Cleanup
       Producer<String> producer = pulsarClient1
               .newProducer(Schema.STRING)
               .topic(topic)
               .enableBatching(true)
               .batchingMaxMessages(5)
               .create();
       
       Consumer<String> consumer = pulsarClient1
               .newConsumer(Schema.STRING)
               .subscriptionType(SubscriptionType.Shared)
               .topic(topic)
               .subscriptionName("mySub")
               .subscribe();
   
       Transaction transaction = pulsarClient1.newTransaction()
               .withTransactionTimeout(5, TimeUnit.HOURS)
               .build()
               .get();
   
       // Send 10 messages
       for (int j = 0; j < 10; j++) {
           producer.newMessage()
                   .value(("value-" + j))
                   .sendAsync();
       }
   
       // Consume messages
       for (int j = 0; j < 20; j++) {
           Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
           if (receive == null) {
               break;
           }
           if (j == 0) {
               // Acknowledge first message in transaction, then abort the 
transaction
               consumer.acknowledgeAsync(receive.getMessageId(), 
transaction).get();
               System.out.println("receive1: " + new String(receive1.getData()) 
+ ", msgId: " + receive1.getMessageId());
               transaction.abort().get();
               // sleep 2s to ensure dispath message before ack.
               Thread.sleep(2000);
           } else {
               // Acknowledge other messages normally
               consumer.acknowledge(receive1.getMessageId());
               System.out.println("receive: " + new String(receive.getData()) + 
", msgId: " + receive.getMessageId());
           }
       }
   }
   ```
   and you can see the console output:
   ```
   receive1: value-0, msgId: 12:0:-1:0
   receive2: value-1, msgId: 12:0:-1:1
   receive2: value-2, msgId: 12:0:-1:2
   receive2: value-3, msgId: 12:0:-1:3
   receive2: value-4, msgId: 12:0:-1:4
   receive2: value-5, msgId: 12:1:-1:0
   receive2: value-6, msgId: 12:1:-1:1
   receive2: value-7, msgId: 12:1:-1:2
   receive2: value-8, msgId: 12:1:-1:3
   receive2: value-9, msgId: 12:1:-1:4
   receive2: value-0, msgId: 12:0:-1:0 
   receive2: value-1, msgId: 12:0:-1:1  # Duplicate!
   receive2: value-2, msgId: 12:0:-1:2  # Duplicate!
   receive2: value-3, msgId: 12:0:-1:3  # Duplicate!
   receive2: value-4, msgId: 12:0:-1:4  # Duplicate!
   ```
   
   ### Additional information
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to