This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c462a657ec KAFKA-13794: Fix comparator of inflightBatchesBySequence in 
TransactionsManager (round 3) (#12096)
c462a657ec is described below

commit c462a657ecbc4761e247c28159fa4055546f1be2
Author: Ismael Juma <[email protected]>
AuthorDate: Thu Apr 28 06:13:23 2022 -0700

    KAFKA-13794: Fix comparator of inflightBatchesBySequence in 
TransactionsManager (round 3) (#12096)
    
    Conceptually, the ordering is defined by the producer id, producer epoch
    and the sequence number. This set should generally only have entries
    for the same producer id and epoch, but there is one case where
    we can have conflicting `remove` calls and hence we add this as
    a temporary safe fix.
    
    We'll follow-up with a fix that ensures the original intended invariant.
    
    Reviewers: Jason Gustafson <[email protected]>, David Jacot 
<[email protected]>, Luke Chen <[email protected]>
---
 .../clients/producer/internals/TransactionManager.java      | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 961e4e4f5a..f70c6e5420 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -175,11 +175,14 @@ public class TransactionManager {
         // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
         private long lastAckedOffset;
 
-        private static final Comparator<ProducerBatch> 
PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
-            if (b1.baseSequence() < b2.baseSequence()) return -1;
-            else if (b1.baseSequence() > b2.baseSequence()) return 1;
-            else return Integer.compare(b1.hashCode(), b2.hashCode());
-        };
+        // `inflightBatchesBySequence` should only have batches with the same 
producer id and producer
+        // epoch, but there is an edge case where we may remove the wrong 
batch if the comparator
+        // only takes `baseSequence` into account.
+        // See 
https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for 
details.
+        private static final Comparator<ProducerBatch> 
PRODUCER_BATCH_COMPARATOR =
+            Comparator.comparingLong(ProducerBatch::producerId)
+                .thenComparing(ProducerBatch::producerEpoch)
+                .thenComparingInt(ProducerBatch::baseSequence);
 
         TopicPartitionEntry() {
             this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;

Reply via email to