This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b068124aef261848c0bb0af0b903c798499015ef Author: Ismael Juma <[email protected]> AuthorDate: Thu Apr 28 06:13:23 2022 -0700 KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (#12096) Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. This set should generally only have entries for the same producer id and epoch, but there is one case where we can have conflicting `remove` calls and hence we add this as a temporary safe fix. We'll follow-up with a fix that ensures the original intended invariant. Reviewers: Jason Gustafson <[email protected]>, David Jacot <[email protected]>, Luke Chen <[email protected]> --- .../clients/producer/internals/TransactionManager.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 7086e593a8..c032cabe98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -184,11 +184,14 @@ public class TransactionManager { // responses which are due to the retention period elapsing, and those which are due to actual lost data. private long lastAckedOffset; - private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = (b1, b2) -> { - if (b1.baseSequence() < b2.baseSequence()) return -1; - else if (b1.baseSequence() > b2.baseSequence()) return 1; - else return Integer.compare(b1.hashCode(), b2.hashCode()); - }; + // `inflightBatchesBySequence` should only have batches with the same producer id and producer + // epoch, but there is an edge case where we may remove the wrong batch if the comparator + // only takes `baseSequence` into account. + // See https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for details. + private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = + Comparator.comparingLong(ProducerBatch::producerId) + .thenComparingInt(ProducerBatch::producerEpoch) + .thenComparingInt(ProducerBatch::baseSequence); TopicPartitionEntry() { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
