berg223 commented on issue #25145:
URL: https://github.com/apache/pulsar/issues/25145#issuecomment-3798271727

   @YinY1 @lhotari  I have reproduced the in issue in my branch named 
[reproduce_issue_25145](https://github.com/apache/pulsar/commit/437a68a4b5ddb5c9b5949b0d9410613fccb4e901)
 And a pr will be committed in the nex days.
   
   
   Consumer lost message due to race condition in acknowledge with batch message
   There is two threads:
   1. Thread A:  receive batch messages from broker. It will filter duplicate 
message by ackSet.
   ```
   org.apache.pulsar.client.impl.ConsumerImpl#receiveIndividualMessagesFromBatch
       
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#isDuplicate
   ```
   
   2. Thread B: flush ack request to broker. It will clear the ackSet at the 
end.
   ```
   
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#flushAsync
       
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#newMessageAckCommandAndWrite
           org.apache.pulsar.common.protocol.Commands#newMultiMessageAck
               
org.apache.pulsar.common.protocol.Commands#newMultiMessageAckCommon
                   
org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable#recycle
   ```
   
   In the race condition, thread B cleared the bitSet. Then thread A mistakenly 
believe the message is duplicate and ignore it.
   
   I have abstracted the log:
   
   ```
   2628511 2026-01-26T02:04:10,170+0800 [pulsar-client-io-1-5] DEBUG 
org.apache.pulsar.client.impl.PersistentAcknowledgm        entsGroupingTracker 
- [ConsumerBase{subscription='sub-2', consumerName='zfGrH', 
topic='persistent://public/default/partitioned_topic_25145-partition-6'}] 
Before pendingIndividualBatchIndexAcks messageId 17:33:6:589
   
   
   2628602 2026-01-26T02:04:10,170+0800 [pulsar-client-io-1-5] DEBUG 
org.apache.pulsar.client.impl.PersistentAcknowledgm        entsGroupingTracker 
- [ConsumerBase{subscription='sub-2', consumerName='zfGrH', 
topic='persistent://public/default/partitioned_topic_25145-partition-6'}] After 
pendingIndividualBatchIndexAcks messageId 17:33:6:589 isEmpty true
   
   
   2628589 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG 
org.apache.pulsar.client.impl.PersistentAcknowledgm        entsGroupingTracker 
- [ConsumerBase{subscription='sub-2', consumerName='zfGrH', 
topic='persistent://public/default/partitioned_topic_25145-partition-6'}] 
polled pending batch index ack, messageId 17:33:6
   
   2628599 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG 
org.apache.pulsar.common.protocol.Commands - before         recycle 17:33
   
   2628601 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG 
org.apache.pulsar.common.protocol.Commands - after         recycle 17:33
   
   2628604 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-5] DEBUG 
org.apache.pulsar.client.impl.PersistentAcknowledgm        entsGroupingTracker 
- [ConsumerBase{subscription='sub-2', consumerName='zfGrH', 
topic='persistent://public/de        
fault/partitioned_topic_25145-partition-6'}] complete ackSet check messageId 
17:33:6:589
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to