lhotari opened a new pull request #10400: URL: https://github.com/apache/pulsar/pull/10400
### Motivation The broker has a bug in calculating permits, #6054 . https://github.com/apache/pulsar/blob/c4f154e79c03cff9055aa4e2ede7748c5952f2bc/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L227-L228 https://github.com/apache/pulsar/blob/875262a0ae2fba82a6dd7d46dd2467d675b0d9c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L527-L528 https://github.com/apache/pulsar/blob/8535dee5ca4d56a5944c068f116d9a32bb6d85f6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L255-L256 As it can be seen from the code locations for calculating permits, it is important that `batchIndexesAcks.getTotalAckedIndexCount()` returns the correct value. When reviewing the code, it was noticed that `EntryBatchIndexesAcks` instances are reused. The problem is, that state isn't properly cleared before reusing. `getTotalAckedIndexCount()` will include state from the previous usage in the calculation. This causes the permit calculations to get corrupted. There are bugs such as #6054 which have been reported about the issue. The only location where the previous state of the `EntryBatchIndexesAcks` instance is cleared is this line of code: https://github.com/apache/pulsar/blob/4709f3aeaed1bc6a68a9a683c95e0398c940cd54/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L137 This doesn't take the case into account where the previous batch was larger than the next one. This is a unit test case that reproduces the issue: ``` BitSetRecyclable bitSet = BitSetRecyclable.create(); bitSet.set(0, 95); long[] nintyFiveBitsSet = bitSet.toLongArray(); EntryBatchIndexesAcks acks = EntryBatchIndexesAcks.get(10); acks.setIndexesAcks(8, Pair.of(100, nintyFiveBitsSet)); acks.setIndexesAcks(9, Pair.of(100, nintyFiveBitsSet)); assertEquals(acks.getTotalAckedIndexCount(), 10); acks.recycle(); acks = EntryBatchIndexesAcks.get(2); // there should be no previous state assertEquals(acks.getTotalAckedIndexCount(), 0); ``` ### Modifications - add a field to EntryBatchIndexesAcks for tracking the maximum size since as the result of reuse, the complete array might not be used - properly reset state between usages - add unit test that reproduced the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
