This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ea7b418fb MINOR: Make TopicPartitionBookkeeper and 
TopicPartitionEntry top level (#12097)
3ea7b418fb is described below

commit 3ea7b418fb3d7e9fc74c27751c1b02b04877f197
Author: Ismael Juma <[email protected]>
AuthorDate: Wed May 11 09:30:46 2022 -0700

    MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level 
(#12097)
    
    This is the first step towards refactoring the `TransactionManager` so
    that it's easier to understand and test. The high level idea is to push
    down behavior to `TopicPartitionEntry` and `TopicPartitionBookkeeper`
    and to encapsulate the state so that the mutations can only be done via
    the appropriate methods.
    
    Inner classes have no mechanism to limit access from the outer class,
    which presents a challenge when mutability is widespread (like we do
    here).
    
    As a first step, we make `TopicPartitionBookkeeper` and
    `TopicPartitionEntry` top level and rename them and a couple
    of methods to make the intended usage clear and avoid
    redundancy.
    
    To make the review easier, we don't change anything else
    except access changes required for the code to compile.
    The next PR will contain the rest of the refactoring.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 157 ++++-----------------
 .../producer/internals/TxnPartitionEntry.java      |  74 ++++++++++
 .../producer/internals/TxnPartitionMap.java        |  83 +++++++++++
 3 files changed, 183 insertions(+), 131 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index f70c6e5420..5aab62eaf2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -68,7 +68,6 @@ import 
org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.PrimitiveRef;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -84,8 +83,6 @@ import java.util.OptionalLong;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /**
@@ -93,116 +90,14 @@ import java.util.function.Supplier;
  */
 public class TransactionManager {
     private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
-    private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
+    static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
 
     private final Logger log;
     private final String transactionalId;
     private final int transactionTimeoutMs;
     private final ApiVersions apiVersions;
 
-    private static class TopicPartitionBookkeeper {
-
-        private final Map<TopicPartition, TopicPartitionEntry> topicPartitions 
= new HashMap<>();
-
-        private TopicPartitionEntry getPartition(TopicPartition 
topicPartition) {
-            TopicPartitionEntry ent = topicPartitions.get(topicPartition);
-            if (ent == null)
-                throw new IllegalStateException("Trying to get the sequence 
number for " + topicPartition +
-                        ", but the sequence number was never set for this 
partition.");
-            return ent;
-        }
-
-        private TopicPartitionEntry getOrCreatePartition(TopicPartition 
topicPartition) {
-            return topicPartitions.computeIfAbsent(topicPartition, tp -> new 
TopicPartitionEntry());
-        }
-
-        private boolean contains(TopicPartition topicPartition) {
-            return topicPartitions.containsKey(topicPartition);
-        }
-
-        private void reset() {
-            topicPartitions.clear();
-        }
-
-        private OptionalLong lastAckedOffset(TopicPartition topicPartition) {
-            TopicPartitionEntry entry = topicPartitions.get(topicPartition);
-            if (entry != null && entry.lastAckedOffset != 
ProduceResponse.INVALID_OFFSET)
-                return OptionalLong.of(entry.lastAckedOffset);
-            else
-                return OptionalLong.empty();
-        }
-
-        private OptionalInt lastAckedSequence(TopicPartition topicPartition) {
-            TopicPartitionEntry entry = topicPartitions.get(topicPartition);
-            if (entry != null && entry.lastAckedSequence != 
NO_LAST_ACKED_SEQUENCE_NUMBER)
-                return OptionalInt.of(entry.lastAckedSequence);
-            else
-                return OptionalInt.empty();
-        }
-
-        private void startSequencesAtBeginning(TopicPartition topicPartition, 
ProducerIdAndEpoch newProducerIdAndEpoch) {
-            final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
-            TopicPartitionEntry topicPartitionEntry = 
getPartition(topicPartition);
-            topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
-                inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value, inFlightBatch.isTransactional());
-                sequence.value += inFlightBatch.recordCount;
-            });
-            topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
-            topicPartitionEntry.nextSequence = sequence.value;
-            topicPartitionEntry.lastAckedSequence = 
NO_LAST_ACKED_SEQUENCE_NUMBER;
-        }
-    }
-
-    private static class TopicPartitionEntry {
-
-        // The producer id/epoch being used for a given partition.
-        private ProducerIdAndEpoch producerIdAndEpoch;
-
-        // The base sequence of the next batch bound for a given partition.
-        private int nextSequence;
-
-        // The sequence number of the last record of the last ack'd batch from 
the given partition. When there are no
-        // in flight requests for a partition, the 
lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
-        private int lastAckedSequence;
-
-        // Keep track of the in flight batches bound for a partition, ordered 
by sequence. This helps us to ensure that
-        // we continue to order batches by the sequence numbers even when the 
responses come back out of order during
-        // leader failover. We add a batch to the queue when it is drained, 
and remove it when the batch completes
-        // (either successfully or through a fatal failure).
-        private SortedSet<ProducerBatch> inflightBatchesBySequence;
-
-        // We keep track of the last acknowledged offset on a per partition 
basis in order to disambiguate UnknownProducer
-        // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
-        private long lastAckedOffset;
-
-        // `inflightBatchesBySequence` should only have batches with the same 
producer id and producer
-        // epoch, but there is an edge case where we may remove the wrong 
batch if the comparator
-        // only takes `baseSequence` into account.
-        // See 
https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for 
details.
-        private static final Comparator<ProducerBatch> 
PRODUCER_BATCH_COMPARATOR =
-            Comparator.comparingLong(ProducerBatch::producerId)
-                .thenComparing(ProducerBatch::producerEpoch)
-                .thenComparingInt(ProducerBatch::baseSequence);
-
-        TopicPartitionEntry() {
-            this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
-            this.nextSequence = 0;
-            this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
-            this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
-            this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
-        }
-
-        void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
-            TreeSet<ProducerBatch> newInflights = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
-            for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
-                resetSequence.accept(inflightBatch);
-                newInflights.add(inflightBatch);
-            }
-            inflightBatchesBySequence = newInflights;
-        }
-    }
-
-    private final TopicPartitionBookkeeper topicPartitionBookkeeper;
+    private final TxnPartitionMap txnPartitionMap;
 
     private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
 
@@ -320,7 +215,7 @@ public class TransactionManager {
         this.partitionsWithUnresolvedSequences = new HashMap<>();
         this.partitionsToRewriteSequences = new HashSet<>();
         this.retryBackoffMs = retryBackoffMs;
-        this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
+        this.txnPartitionMap = new TxnPartitionMap();
         this.apiVersions = apiVersions;
     }
 
@@ -444,7 +339,7 @@ public class TransactionManager {
                 return;
             } else {
                 log.debug("Begin adding new partition {} to transaction", 
topicPartition);
-                topicPartitionBookkeeper.getOrCreatePartition(topicPartition);
+                txnPartitionMap.getOrCreate(topicPartition);
                 newPartitionsInTransaction.add(topicPartition);
             }
         }
@@ -532,7 +427,7 @@ public class TransactionManager {
         if (hasStaleProducerIdAndEpoch(topicPartition) && 
!hasInflightBatches(topicPartition)) {
             // If the batch was on a different ID and/or epoch (due to an 
epoch bump) and all its in-flight batches
             // have completed, reset the partition sequence so that the next 
batch (with the new epoch) starts from 0
-            topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
+            txnPartitionMap.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
             log.debug("ProducerId of partition {} set to {} with epoch {}. 
Reinitialize sequence at beginning.",
                       topicPartition, producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch);
         }
@@ -561,12 +456,12 @@ public class TransactionManager {
     }
 
     private void resetSequenceForPartition(TopicPartition topicPartition) {
-        topicPartitionBookkeeper.topicPartitions.remove(topicPartition);
+        txnPartitionMap.topicPartitions.remove(topicPartition);
         this.partitionsWithUnresolvedSequences.remove(topicPartition);
     }
 
     private void resetSequenceNumbers() {
-        topicPartitionBookkeeper.reset();
+        txnPartitionMap.reset();
         this.partitionsWithUnresolvedSequences.clear();
     }
 
@@ -585,7 +480,7 @@ public class TransactionManager {
 
         // When the epoch is bumped, rewrite all in-flight sequences for the 
partition(s) that triggered the epoch bump
         for (TopicPartition topicPartition : 
this.partitionsToRewriteSequences) {
-            
this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
+            this.txnPartitionMap.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
             this.partitionsWithUnresolvedSequences.remove(topicPartition);
         }
         this.partitionsToRewriteSequences.clear();
@@ -613,27 +508,27 @@ public class TransactionManager {
      * Returns the next sequence number to be written to the given 
TopicPartition.
      */
     synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        return 
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence;
+        return txnPartitionMap.getOrCreate(topicPartition).nextSequence;
     }
 
     /**
      * Returns the current producer id/epoch of the given TopicPartition.
      */
     synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition 
topicPartition) {
-        return 
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch;
+        return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch;
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, 
int increment) {
         Integer currentSequence = sequenceNumber(topicPartition);
 
         currentSequence = 
DefaultRecordBatch.incrementSequence(currentSequence, increment);
-        topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = 
currentSequence;
+        txnPartitionMap.get(topicPartition).nextSequence = currentSequence;
     }
 
     synchronized void addInFlightBatch(ProducerBatch batch) {
         if (!batch.hasSequence())
             throw new IllegalStateException("Can't track batch for partition " 
+ batch.topicPartition + " when sequence is not set.");
-        
topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.add(batch);
+        
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch);
     }
 
     /**
@@ -647,7 +542,7 @@ public class TransactionManager {
         if (!hasInflightBatches(topicPartition))
             return RecordBatch.NO_SEQUENCE;
 
-        SortedSet<ProducerBatch> inflightBatches = 
topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
+        SortedSet<ProducerBatch> inflightBatches = 
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
         if (inflightBatches.isEmpty())
             return RecordBatch.NO_SEQUENCE;
         else
@@ -655,20 +550,20 @@ public class TransactionManager {
     }
 
     synchronized ProducerBatch nextBatchBySequence(TopicPartition 
topicPartition) {
-        SortedSet<ProducerBatch> queue = 
topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
+        SortedSet<ProducerBatch> queue = 
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
         return queue.isEmpty() ? null : queue.first();
     }
 
     synchronized void removeInFlightBatch(ProducerBatch batch) {
         if (hasInflightBatches(batch.topicPartition)) {
-            
topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.remove(batch);
+            
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch);
         }
     }
 
     private int maybeUpdateLastAckedSequence(TopicPartition topicPartition, 
int sequence) {
         int lastAckedSequence = 
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER);
         if (sequence > lastAckedSequence) {
-            
topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = 
sequence;
+            txnPartitionMap.get(topicPartition).lastAckedSequence = sequence;
             return sequence;
         }
 
@@ -676,11 +571,11 @@ public class TransactionManager {
     }
 
     synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
-        return topicPartitionBookkeeper.lastAckedSequence(topicPartition);
+        return txnPartitionMap.lastAckedSequence(topicPartition);
     }
 
     synchronized OptionalLong lastAckedOffset(TopicPartition topicPartition) {
-        return topicPartitionBookkeeper.lastAckedOffset(topicPartition);
+        return txnPartitionMap.lastAckedOffset(topicPartition);
     }
 
     private void updateLastAckedOffset(ProduceResponse.PartitionResponse 
response, ProducerBatch batch) {
@@ -692,10 +587,10 @@ public class TransactionManager {
         // response for this. This can happen only if the producer is only 
idempotent (not transactional) and in
         // this case there will be no tracked bookkeeper entry about it, so we 
have to insert one.
         if (!lastAckedOffset.isPresent() && !isTransactional()) {
-            
topicPartitionBookkeeper.getOrCreatePartition(batch.topicPartition);
+            txnPartitionMap.getOrCreate(batch.topicPartition);
         }
         if (lastOffset > 
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
-            
topicPartitionBookkeeper.getPartition(batch.topicPartition).lastAckedOffset = 
lastOffset;
+            txnPartitionMap.get(batch.topicPartition).lastAckedOffset = 
lastOffset;
         } else {
             log.trace("Partition {} keeps lastOffset at {}", 
batch.topicPartition, lastOffset);
         }
@@ -768,7 +663,7 @@ public class TransactionManager {
     // This method must only be called when we know that the batch is question 
has been unequivocally failed by the broker,
     // ie. it has received a confirmed fatal status code like 'Message Too 
Large' or something similar.
     private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
-        if (!topicPartitionBookkeeper.contains(batch.topicPartition))
+        if (!txnPartitionMap.contains(batch.topicPartition))
             // Sequence numbers are not being tracked for this partition. This 
could happen if the producer id was just
             // reset due to a previous OutOfOrderSequenceException.
             return;
@@ -781,7 +676,7 @@ public class TransactionManager {
 
         setNextSequence(batch.topicPartition, currentSequence);
 
-        
topicPartitionBookkeeper.getPartition(batch.topicPartition).resetSequenceNumbers(inFlightBatch
 -> {
+        
txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> 
{
             if (inFlightBatch.baseSequence() < batch.baseSequence())
                 return;
 
@@ -795,11 +690,11 @@ public class TransactionManager {
     }
 
     synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
-        return 
!topicPartitionBookkeeper.getOrCreatePartition(topicPartition).inflightBatchesBySequence.isEmpty();
+        return 
!txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty();
     }
 
     synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition 
topicPartition) {
-        return 
!producerIdAndEpoch.equals(topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch);
+        return 
!producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch);
     }
 
     synchronized boolean hasUnresolvedSequences() {
@@ -864,7 +759,7 @@ public class TransactionManager {
     }
 
     private void setNextSequence(TopicPartition topicPartition, int sequence) {
-        topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = 
sequence;
+        txnPartitionMap.get(topicPartition).nextSequence = sequence;
     }
 
     private boolean isNextSequenceForUnresolvedPartition(TopicPartition 
topicPartition, int sequence) {
@@ -1016,7 +911,7 @@ public class TransactionManager {
                 // inflight batches to be from the beginning and retry them, 
so that the transaction does not need to
                 // be aborted. For the idempotent producer, bump the epoch to 
avoid reusing (sequence, epoch) pairs
                 if (isTransactional()) {
-                    
topicPartitionBookkeeper.startSequencesAtBeginning(batch.topicPartition, 
this.producerIdAndEpoch);
+                    
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, 
this.producerIdAndEpoch);
                 } else {
                     requestEpochBumpForPartition(batch.topicPartition);
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
new file mode 100644
index 0000000000..be79d8ee0f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionEntry {
+
+    // The producer id/epoch being used for a given partition.
+    ProducerIdAndEpoch producerIdAndEpoch;
+
+    // The base sequence of the next batch bound for a given partition.
+    int nextSequence;
+
+    // The sequence number of the last record of the last ack'd batch from the 
given partition. When there are no
+    // in flight requests for a partition, the 
lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
+    int lastAckedSequence;
+
+    // Keep track of the in flight batches bound for a partition, ordered by 
sequence. This helps us to ensure that
+    // we continue to order batches by the sequence numbers even when the 
responses come back out of order during
+    // leader failover. We add a batch to the queue when it is drained, and 
remove it when the batch completes
+    // (either successfully or through a fatal failure).
+    SortedSet<ProducerBatch> inflightBatchesBySequence;
+
+    // We keep track of the last acknowledged offset on a per partition basis 
in order to disambiguate UnknownProducer
+    // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
+    long lastAckedOffset;
+
+    // `inflightBatchesBySequence` should only have batches with the same 
producer id and producer
+    // epoch, but there is an edge case where we may remove the wrong batch if 
the comparator
+    // only takes `baseSequence` into account.
+    // See 
https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for 
details.
+    private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR =
+        Comparator.comparingLong(ProducerBatch::producerId)
+            .thenComparingInt(ProducerBatch::producerEpoch)
+            .thenComparingInt(ProducerBatch::baseSequence);
+
+    TxnPartitionEntry() {
+        this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
+        this.nextSequence = 0;
+        this.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+        this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
+        this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
+    }
+
+    void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
+        TreeSet<ProducerBatch> newInflights = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
+        for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
+            resetSequence.accept(inflightBatch);
+            newInflights.add(inflightBatch);
+        }
+        inflightBatchesBySequence = newInflights;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
new file mode 100644
index 0000000000..95553119c5
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionMap {
+
+    final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new 
HashMap<>();
+
+    TxnPartitionEntry get(TopicPartition topicPartition) {
+        TxnPartitionEntry ent = topicPartitions.get(topicPartition);
+        if (ent == null) {
+            throw new IllegalStateException("Trying to get the sequence number 
for " + topicPartition +
+                ", but the sequence number was never set for this partition.");
+        }
+        return ent;
+    }
+
+    TxnPartitionEntry getOrCreate(TopicPartition topicPartition) {
+        return topicPartitions.computeIfAbsent(topicPartition, tp -> new 
TxnPartitionEntry());
+    }
+
+    boolean contains(TopicPartition topicPartition) {
+        return topicPartitions.containsKey(topicPartition);
+    }
+
+    void reset() {
+        topicPartitions.clear();
+    }
+
+    OptionalLong lastAckedOffset(TopicPartition topicPartition) {
+        TxnPartitionEntry entry = topicPartitions.get(topicPartition);
+        if (entry != null && entry.lastAckedOffset != 
ProduceResponse.INVALID_OFFSET) {
+            return OptionalLong.of(entry.lastAckedOffset);
+        } else {
+            return OptionalLong.empty();
+        }
+    }
+
+    OptionalInt lastAckedSequence(TopicPartition topicPartition) {
+        TxnPartitionEntry entry = topicPartitions.get(topicPartition);
+        if (entry != null && entry.lastAckedSequence != 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER) {
+            return OptionalInt.of(entry.lastAckedSequence);
+        } else {
+            return OptionalInt.empty();
+        }
+    }
+
+    void startSequencesAtBeginning(TopicPartition topicPartition, 
ProducerIdAndEpoch newProducerIdAndEpoch) {
+        final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+        TxnPartitionEntry topicPartitionEntry = get(topicPartition);
+        topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
+            inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value, inFlightBatch.isTransactional());
+            sequence.value += inFlightBatch.recordCount;
+        });
+        topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
+        topicPartitionEntry.nextSequence = sequence.value;
+        topicPartitionEntry.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+    }
+}

Reply via email to