nborisov commented on issue #23845:
URL: https://github.com/apache/pulsar/issues/23845#issuecomment-3265648719

   Hi @lhotari ! I've managed to reproduce the issue when a message is not 
acknowledged by consumers during their restarts despite your recent fix.
   
   
   The not acknowledged message added to drainign hashes tracker during such 
restarts here:
   ```
   private synchronized void registerDrainingHashes(Consumer skipConsumer,
                                                        ImpactedConsumersResult 
impactedConsumers) {
           impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> 
{
               if (c != skipConsumer) {
                   c.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
                       if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
                           log.warn("[{}] Sticky key hash was missing for 
{}:{}", getName(), ledgerId, entryId);
                           return;
                       }
                       if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
                           // add the pending ack to the draining hashes 
tracker if the hash is in the range
                           drainingHashesTracker.addEntry(c, stickyKeyHash);
                       }
                   });
               }
           });
       }
   ```
   
   As a result 
`org.apache.pulsar.broker.service.DrainingHashesTracker.DrainingHashEntry#refCount`
 incremented multiple times. 
   When consumer gets disconnected this `refCount` decremented by only one, so 
it is not removed from 
`org.apache.pulsar.broker.service.DrainingHashesTracker#drainingHashes`. As a 
result this message is never delivered to a consumer until broker restart. 
Consumers restart do not help.
   
   What is going on step by step when the issue reproduced:
   
   1. Multiple consumers works
   2. Produce a message which is delivered to consumer 1 but consumer does not 
ack the message
   3. Add a consumer 
   4. Stop consumer 1
   5. The message is assigned to a consumer 2 and but consumer 2 does not ack 
the message
   6. Add a new consumer 
   7. The message's sticky hash appears at 
`org.apache.pulsar.broker.service.ImpactedConsumersResult#removedHashRanges`
   8. The message's sticky hash added to draningHashes at 
`org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers#registerDrainingHashes`
   9. Add another one consumer
   10. The message's sticky hash appears at 
`org.apache.pulsar.broker.service.ImpactedConsumersResult#removedHashRanges`
   11. The message's sticky hash added to draningHashes at 
`org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers#registerDrainingHashes`
 and the refCount of the hash incremented
   
   As a result drainingHashes contains a key with refCount equal to 2, but 
actual drained message count is 1. 
   
   When a consumer which have the message in pending ack stops, drainingHashes 
contains a key with refCount equal to 1 which will never be decremented as no 
consumer has it in pending ack. 
   
   The issue reproduction is not consistent unfortunately. I am still trying to 
find the root cause.
   Hope this input would be helpful for the investigation. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to