This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ea7b418fb MINOR: Make TopicPartitionBookkeeper and
TopicPartitionEntry top level (#12097)
3ea7b418fb is described below
commit 3ea7b418fb3d7e9fc74c27751c1b02b04877f197
Author: Ismael Juma <[email protected]>
AuthorDate: Wed May 11 09:30:46 2022 -0700
MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level
(#12097)
This is the first step towards refactoring the `TransactionManager` so
that it's easier to understand and test. The high level idea is to push
down behavior to `TopicPartitionEntry` and `TopicPartitionBookkeeper`
and to encapsulate the state so that the mutations can only be done via
the appropriate methods.
Inner classes have no mechanism to limit access from the outer class,
which presents a challenge when mutability is widespread (like we do
here).
As a first step, we make `TopicPartitionBookkeeper` and
`TopicPartitionEntry` top level and rename them and a couple
of methods to make the intended usage clear and avoid
redundancy.
To make the review easier, we don't change anything else
except access changes required for the code to compile.
The next PR will contain the rest of the refactoring.
Reviewers: Jason Gustafson <[email protected]>
---
.../producer/internals/TransactionManager.java | 157 ++++-----------------
.../producer/internals/TxnPartitionEntry.java | 74 ++++++++++
.../producer/internals/TxnPartitionMap.java | 83 +++++++++++
3 files changed, 183 insertions(+), 131 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index f70c6e5420..5aab62eaf2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -68,7 +68,6 @@ import
org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.PrimitiveRef;
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -84,8 +83,6 @@ import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@@ -93,116 +90,14 @@ import java.util.function.Supplier;
*/
public class TransactionManager {
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
- private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
+ static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
private final Logger log;
private final String transactionalId;
private final int transactionTimeoutMs;
private final ApiVersions apiVersions;
- private static class TopicPartitionBookkeeper {
-
- private final Map<TopicPartition, TopicPartitionEntry> topicPartitions
= new HashMap<>();
-
- private TopicPartitionEntry getPartition(TopicPartition
topicPartition) {
- TopicPartitionEntry ent = topicPartitions.get(topicPartition);
- if (ent == null)
- throw new IllegalStateException("Trying to get the sequence
number for " + topicPartition +
- ", but the sequence number was never set for this
partition.");
- return ent;
- }
-
- private TopicPartitionEntry getOrCreatePartition(TopicPartition
topicPartition) {
- return topicPartitions.computeIfAbsent(topicPartition, tp -> new
TopicPartitionEntry());
- }
-
- private boolean contains(TopicPartition topicPartition) {
- return topicPartitions.containsKey(topicPartition);
- }
-
- private void reset() {
- topicPartitions.clear();
- }
-
- private OptionalLong lastAckedOffset(TopicPartition topicPartition) {
- TopicPartitionEntry entry = topicPartitions.get(topicPartition);
- if (entry != null && entry.lastAckedOffset !=
ProduceResponse.INVALID_OFFSET)
- return OptionalLong.of(entry.lastAckedOffset);
- else
- return OptionalLong.empty();
- }
-
- private OptionalInt lastAckedSequence(TopicPartition topicPartition) {
- TopicPartitionEntry entry = topicPartitions.get(topicPartition);
- if (entry != null && entry.lastAckedSequence !=
NO_LAST_ACKED_SEQUENCE_NUMBER)
- return OptionalInt.of(entry.lastAckedSequence);
- else
- return OptionalInt.empty();
- }
-
- private void startSequencesAtBeginning(TopicPartition topicPartition,
ProducerIdAndEpoch newProducerIdAndEpoch) {
- final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
- TopicPartitionEntry topicPartitionEntry =
getPartition(topicPartition);
- topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
- inFlightBatch.resetProducerState(newProducerIdAndEpoch,
sequence.value, inFlightBatch.isTransactional());
- sequence.value += inFlightBatch.recordCount;
- });
- topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
- topicPartitionEntry.nextSequence = sequence.value;
- topicPartitionEntry.lastAckedSequence =
NO_LAST_ACKED_SEQUENCE_NUMBER;
- }
- }
-
- private static class TopicPartitionEntry {
-
- // The producer id/epoch being used for a given partition.
- private ProducerIdAndEpoch producerIdAndEpoch;
-
- // The base sequence of the next batch bound for a given partition.
- private int nextSequence;
-
- // The sequence number of the last record of the last ack'd batch from
the given partition. When there are no
- // in flight requests for a partition, the
lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
- private int lastAckedSequence;
-
- // Keep track of the in flight batches bound for a partition, ordered
by sequence. This helps us to ensure that
- // we continue to order batches by the sequence numbers even when the
responses come back out of order during
- // leader failover. We add a batch to the queue when it is drained,
and remove it when the batch completes
- // (either successfully or through a fatal failure).
- private SortedSet<ProducerBatch> inflightBatchesBySequence;
-
- // We keep track of the last acknowledged offset on a per partition
basis in order to disambiguate UnknownProducer
- // responses which are due to the retention period elapsing, and those
which are due to actual lost data.
- private long lastAckedOffset;
-
- // `inflightBatchesBySequence` should only have batches with the same
producer id and producer
- // epoch, but there is an edge case where we may remove the wrong
batch if the comparator
- // only takes `baseSequence` into account.
- // See
https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for
details.
- private static final Comparator<ProducerBatch>
PRODUCER_BATCH_COMPARATOR =
- Comparator.comparingLong(ProducerBatch::producerId)
- .thenComparing(ProducerBatch::producerEpoch)
- .thenComparingInt(ProducerBatch::baseSequence);
-
- TopicPartitionEntry() {
- this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
- this.nextSequence = 0;
- this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
- this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
- this.inflightBatchesBySequence = new
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
- }
-
- void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
- TreeSet<ProducerBatch> newInflights = new
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
- for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
- resetSequence.accept(inflightBatch);
- newInflights.add(inflightBatch);
- }
- inflightBatchesBySequence = newInflights;
- }
- }
-
- private final TopicPartitionBookkeeper topicPartitionBookkeeper;
+ private final TxnPartitionMap txnPartitionMap;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
@@ -320,7 +215,7 @@ public class TransactionManager {
this.partitionsWithUnresolvedSequences = new HashMap<>();
this.partitionsToRewriteSequences = new HashSet<>();
this.retryBackoffMs = retryBackoffMs;
- this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
+ this.txnPartitionMap = new TxnPartitionMap();
this.apiVersions = apiVersions;
}
@@ -444,7 +339,7 @@ public class TransactionManager {
return;
} else {
log.debug("Begin adding new partition {} to transaction",
topicPartition);
- topicPartitionBookkeeper.getOrCreatePartition(topicPartition);
+ txnPartitionMap.getOrCreate(topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
}
@@ -532,7 +427,7 @@ public class TransactionManager {
if (hasStaleProducerIdAndEpoch(topicPartition) &&
!hasInflightBatches(topicPartition)) {
// If the batch was on a different ID and/or epoch (due to an
epoch bump) and all its in-flight batches
// have completed, reset the partition sequence so that the next
batch (with the new epoch) starts from 0
- topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
+ txnPartitionMap.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
log.debug("ProducerId of partition {} set to {} with epoch {}.
Reinitialize sequence at beginning.",
topicPartition, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch);
}
@@ -561,12 +456,12 @@ public class TransactionManager {
}
private void resetSequenceForPartition(TopicPartition topicPartition) {
- topicPartitionBookkeeper.topicPartitions.remove(topicPartition);
+ txnPartitionMap.topicPartitions.remove(topicPartition);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}
private void resetSequenceNumbers() {
- topicPartitionBookkeeper.reset();
+ txnPartitionMap.reset();
this.partitionsWithUnresolvedSequences.clear();
}
@@ -585,7 +480,7 @@ public class TransactionManager {
// When the epoch is bumped, rewrite all in-flight sequences for the
partition(s) that triggered the epoch bump
for (TopicPartition topicPartition :
this.partitionsToRewriteSequences) {
-
this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
+ this.txnPartitionMap.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}
this.partitionsToRewriteSequences.clear();
@@ -613,27 +508,27 @@ public class TransactionManager {
* Returns the next sequence number to be written to the given
TopicPartition.
*/
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
- return
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence;
+ return txnPartitionMap.getOrCreate(topicPartition).nextSequence;
}
/**
* Returns the current producer id/epoch of the given TopicPartition.
*/
synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition
topicPartition) {
- return
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch;
+ return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch;
}
synchronized void incrementSequenceNumber(TopicPartition topicPartition,
int increment) {
Integer currentSequence = sequenceNumber(topicPartition);
currentSequence =
DefaultRecordBatch.incrementSequence(currentSequence, increment);
- topicPartitionBookkeeper.getPartition(topicPartition).nextSequence =
currentSequence;
+ txnPartitionMap.get(topicPartition).nextSequence = currentSequence;
}
synchronized void addInFlightBatch(ProducerBatch batch) {
if (!batch.hasSequence())
throw new IllegalStateException("Can't track batch for partition "
+ batch.topicPartition + " when sequence is not set.");
-
topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.add(batch);
+
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch);
}
/**
@@ -647,7 +542,7 @@ public class TransactionManager {
if (!hasInflightBatches(topicPartition))
return RecordBatch.NO_SEQUENCE;
- SortedSet<ProducerBatch> inflightBatches =
topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
+ SortedSet<ProducerBatch> inflightBatches =
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
if (inflightBatches.isEmpty())
return RecordBatch.NO_SEQUENCE;
else
@@ -655,20 +550,20 @@ public class TransactionManager {
}
synchronized ProducerBatch nextBatchBySequence(TopicPartition
topicPartition) {
- SortedSet<ProducerBatch> queue =
topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
+ SortedSet<ProducerBatch> queue =
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
return queue.isEmpty() ? null : queue.first();
}
synchronized void removeInFlightBatch(ProducerBatch batch) {
if (hasInflightBatches(batch.topicPartition)) {
-
topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.remove(batch);
+
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch);
}
}
private int maybeUpdateLastAckedSequence(TopicPartition topicPartition,
int sequence) {
int lastAckedSequence =
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER);
if (sequence > lastAckedSequence) {
-
topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence =
sequence;
+ txnPartitionMap.get(topicPartition).lastAckedSequence = sequence;
return sequence;
}
@@ -676,11 +571,11 @@ public class TransactionManager {
}
synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
- return topicPartitionBookkeeper.lastAckedSequence(topicPartition);
+ return txnPartitionMap.lastAckedSequence(topicPartition);
}
synchronized OptionalLong lastAckedOffset(TopicPartition topicPartition) {
- return topicPartitionBookkeeper.lastAckedOffset(topicPartition);
+ return txnPartitionMap.lastAckedOffset(topicPartition);
}
private void updateLastAckedOffset(ProduceResponse.PartitionResponse
response, ProducerBatch batch) {
@@ -692,10 +587,10 @@ public class TransactionManager {
// response for this. This can happen only if the producer is only
idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we
have to insert one.
if (!lastAckedOffset.isPresent() && !isTransactional()) {
-
topicPartitionBookkeeper.getOrCreatePartition(batch.topicPartition);
+ txnPartitionMap.getOrCreate(batch.topicPartition);
}
if (lastOffset >
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
-
topicPartitionBookkeeper.getPartition(batch.topicPartition).lastAckedOffset =
lastOffset;
+ txnPartitionMap.get(batch.topicPartition).lastAckedOffset =
lastOffset;
} else {
log.trace("Partition {} keeps lastOffset at {}",
batch.topicPartition, lastOffset);
}
@@ -768,7 +663,7 @@ public class TransactionManager {
// This method must only be called when we know that the batch is question
has been unequivocally failed by the broker,
// ie. it has received a confirmed fatal status code like 'Message Too
Large' or something similar.
private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
- if (!topicPartitionBookkeeper.contains(batch.topicPartition))
+ if (!txnPartitionMap.contains(batch.topicPartition))
// Sequence numbers are not being tracked for this partition. This
could happen if the producer id was just
// reset due to a previous OutOfOrderSequenceException.
return;
@@ -781,7 +676,7 @@ public class TransactionManager {
setNextSequence(batch.topicPartition, currentSequence);
-
topicPartitionBookkeeper.getPartition(batch.topicPartition).resetSequenceNumbers(inFlightBatch
-> {
+
txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch ->
{
if (inFlightBatch.baseSequence() < batch.baseSequence())
return;
@@ -795,11 +690,11 @@ public class TransactionManager {
}
synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
- return
!topicPartitionBookkeeper.getOrCreatePartition(topicPartition).inflightBatchesBySequence.isEmpty();
+ return
!txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty();
}
synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition
topicPartition) {
- return
!producerIdAndEpoch.equals(topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch);
+ return
!producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch);
}
synchronized boolean hasUnresolvedSequences() {
@@ -864,7 +759,7 @@ public class TransactionManager {
}
private void setNextSequence(TopicPartition topicPartition, int sequence) {
- topicPartitionBookkeeper.getPartition(topicPartition).nextSequence =
sequence;
+ txnPartitionMap.get(topicPartition).nextSequence = sequence;
}
private boolean isNextSequenceForUnresolvedPartition(TopicPartition
topicPartition, int sequence) {
@@ -1016,7 +911,7 @@ public class TransactionManager {
// inflight batches to be from the beginning and retry them,
so that the transaction does not need to
// be aborted. For the idempotent producer, bump the epoch to
avoid reusing (sequence, epoch) pairs
if (isTransactional()) {
-
topicPartitionBookkeeper.startSequencesAtBeginning(batch.topicPartition,
this.producerIdAndEpoch);
+
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,
this.producerIdAndEpoch);
} else {
requestEpochBumpForPartition(batch.topicPartition);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
new file mode 100644
index 0000000000..be79d8ee0f
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionEntry {
+
+ // The producer id/epoch being used for a given partition.
+ ProducerIdAndEpoch producerIdAndEpoch;
+
+ // The base sequence of the next batch bound for a given partition.
+ int nextSequence;
+
+ // The sequence number of the last record of the last ack'd batch from the
given partition. When there are no
+ // in flight requests for a partition, the
lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
+ int lastAckedSequence;
+
+ // Keep track of the in flight batches bound for a partition, ordered by
sequence. This helps us to ensure that
+ // we continue to order batches by the sequence numbers even when the
responses come back out of order during
+ // leader failover. We add a batch to the queue when it is drained, and
remove it when the batch completes
+ // (either successfully or through a fatal failure).
+ SortedSet<ProducerBatch> inflightBatchesBySequence;
+
+ // We keep track of the last acknowledged offset on a per partition basis
in order to disambiguate UnknownProducer
+ // responses which are due to the retention period elapsing, and those
which are due to actual lost data.
+ long lastAckedOffset;
+
+ // `inflightBatchesBySequence` should only have batches with the same
producer id and producer
+ // epoch, but there is an edge case where we may remove the wrong batch if
the comparator
+ // only takes `baseSequence` into account.
+ // See
https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for
details.
+ private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR =
+ Comparator.comparingLong(ProducerBatch::producerId)
+ .thenComparingInt(ProducerBatch::producerEpoch)
+ .thenComparingInt(ProducerBatch::baseSequence);
+
+ TxnPartitionEntry() {
+ this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
+ this.nextSequence = 0;
+ this.lastAckedSequence =
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+ this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
+ this.inflightBatchesBySequence = new
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
+ }
+
+ void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
+ TreeSet<ProducerBatch> newInflights = new
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
+ for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
+ resetSequence.accept(inflightBatch);
+ newInflights.add(inflightBatch);
+ }
+ inflightBatchesBySequence = newInflights;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
new file mode 100644
index 0000000000..95553119c5
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionMap {
+
+ final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new
HashMap<>();
+
+ TxnPartitionEntry get(TopicPartition topicPartition) {
+ TxnPartitionEntry ent = topicPartitions.get(topicPartition);
+ if (ent == null) {
+ throw new IllegalStateException("Trying to get the sequence number
for " + topicPartition +
+ ", but the sequence number was never set for this partition.");
+ }
+ return ent;
+ }
+
+ TxnPartitionEntry getOrCreate(TopicPartition topicPartition) {
+ return topicPartitions.computeIfAbsent(topicPartition, tp -> new
TxnPartitionEntry());
+ }
+
+ boolean contains(TopicPartition topicPartition) {
+ return topicPartitions.containsKey(topicPartition);
+ }
+
+ void reset() {
+ topicPartitions.clear();
+ }
+
+ OptionalLong lastAckedOffset(TopicPartition topicPartition) {
+ TxnPartitionEntry entry = topicPartitions.get(topicPartition);
+ if (entry != null && entry.lastAckedOffset !=
ProduceResponse.INVALID_OFFSET) {
+ return OptionalLong.of(entry.lastAckedOffset);
+ } else {
+ return OptionalLong.empty();
+ }
+ }
+
+ OptionalInt lastAckedSequence(TopicPartition topicPartition) {
+ TxnPartitionEntry entry = topicPartitions.get(topicPartition);
+ if (entry != null && entry.lastAckedSequence !=
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER) {
+ return OptionalInt.of(entry.lastAckedSequence);
+ } else {
+ return OptionalInt.empty();
+ }
+ }
+
+ void startSequencesAtBeginning(TopicPartition topicPartition,
ProducerIdAndEpoch newProducerIdAndEpoch) {
+ final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+ TxnPartitionEntry topicPartitionEntry = get(topicPartition);
+ topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
+ inFlightBatch.resetProducerState(newProducerIdAndEpoch,
sequence.value, inFlightBatch.isTransactional());
+ sequence.value += inFlightBatch.recordCount;
+ });
+ topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
+ topicPartitionEntry.nextSequence = sequence.value;
+ topicPartitionEntry.lastAckedSequence =
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+ }
+}