lhotari opened a new pull request #10400:
URL: https://github.com/apache/pulsar/pull/10400


   ### Motivation
   
   The broker has a bug in calculating permits, #6054 .
   
   
https://github.com/apache/pulsar/blob/c4f154e79c03cff9055aa4e2ede7748c5952f2bc/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L227-L228
   
   
https://github.com/apache/pulsar/blob/875262a0ae2fba82a6dd7d46dd2467d675b0d9c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L527-L528
   
   
https://github.com/apache/pulsar/blob/8535dee5ca4d56a5944c068f116d9a32bb6d85f6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L255-L256
   
   As it can be seen from the code locations for calculating permits, it is 
important that `batchIndexesAcks.getTotalAckedIndexCount()` returns the correct 
value. 
   When reviewing the code, it was noticed that `EntryBatchIndexesAcks` 
instances are reused. The problem is, that state isn't properly cleared before 
reusing. `getTotalAckedIndexCount()` will include state from the previous usage 
in the calculation. This causes the permit calculations to get corrupted. There 
are bugs such as #6054 which have been reported about the issue.
   
   The only location where the previous state of the `EntryBatchIndexesAcks` 
instance is cleared is this line of code:
   
https://github.com/apache/pulsar/blob/4709f3aeaed1bc6a68a9a683c95e0398c940cd54/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L137
   
   This doesn't take the case into account where the previous batch was larger 
than the next one. This is a unit test case that reproduces the issue:
   ```
           BitSetRecyclable bitSet = BitSetRecyclable.create();
           bitSet.set(0, 95);
           long[] nintyFiveBitsSet = bitSet.toLongArray();
   
           EntryBatchIndexesAcks acks = EntryBatchIndexesAcks.get(10);
           acks.setIndexesAcks(8, Pair.of(100, nintyFiveBitsSet));
           acks.setIndexesAcks(9, Pair.of(100, nintyFiveBitsSet));
   
           assertEquals(acks.getTotalAckedIndexCount(), 10);
   
           acks.recycle();
   
           acks = EntryBatchIndexesAcks.get(2);
           // there should be no previous state
           assertEquals(acks.getTotalAckedIndexCount(), 0);
   ```
   
   ### Modifications
   
   - add a field to EntryBatchIndexesAcks for tracking the maximum size since 
as the result of reuse, the complete array might not be used
   - properly reset state between usages
   - add unit test that reproduced the issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to