This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 4e814c7 KAFKA-12152; Idempotent Producer does not reset the sequence
number of partitions without in-flight batches (#9832)
4e814c7 is described below
commit 4e814c75bf85d365004d901c7aef2e9a1780cc7d
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 21 09:38:19 2021 +0100
KAFKA-12152; Idempotent Producer does not reset the sequence number of
partitions without in-flight batches (#9832)
When a `OutOfOrderSequenceException` error is received by an idempotent
producer for a partition, the producer bumps its epoch, adjusts the sequence
number and the epoch of the in-flight batches of the partitions affected by the
`OutOfOrderSequenceException` error. This happens in
`TransactionManager#bumpIdempotentProducerEpoch`.
The remaining partitions are treated separately. When the last in-flight
batch of a given partition is completed, the sequence number is reset. This
happens in `TransactionManager#handleCompletedBatch`.
However, when a given partition does not have in-flight batches when the
producer epoch is bumped, its sequence number is not reset. Similarly, when a
in-flight batch eventually fails after the producer epoch is bumped, the batch
is discarded and sequence number for its partition is not reset. In both cases,
it results in having subsequent producer request to use the new producer epoch
with the old sequence number and to be rejected by the broker.
With this patch, the producer id/epoch is now stored in the partition
state. This ensure that the producer id/epoch of a given partition remains
consistent with its sequence number. When the producer epoch is bumped, the
producer epoch of the partition is lazily updated and its sequence number is
reset accordingly. This happens only when all the in-flight batches have been
resolved.
Reviewers: Bob Barrett <[email protected]>, Jason Gustafson
<[email protected]>
---
.../producer/internals/RecordAccumulator.java | 14 +-
.../producer/internals/TransactionManager.java | 59 +++--
.../clients/producer/internals/SenderTest.java | 279 ++++++++++++++++++++-
.../producer/internals/TransactionManagerTest.java | 5 +
4 files changed, 316 insertions(+), 41 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 3781297..efccf30 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -508,15 +508,11 @@ public final class RecordAccumulator {
return true;
if (!first.hasSequence()) {
- if (transactionManager.hasInflightBatches(tp)) {
+ if (transactionManager.hasInflightBatches(tp) &&
transactionManager.hasStaleProducerIdAndEpoch(tp)) {
// Don't drain any new batches while the partition has
in-flight batches with a different epoch
// and/or producer ID. Otherwise, a batch with a new epoch
and sequence number
// 0 could be written before earlier batches complete,
which would cause out of sequence errors
- ProducerBatch firstInFlightBatch =
transactionManager.nextBatchBySequence(tp);
-
- if (firstInFlightBatch != null &&
transactionManager.producerIdOrEpochNotMatch(firstInFlightBatch)) {
- return true;
- }
+ return true;
}
if
(transactionManager.hasUnresolvedSequence(first.topicPartition))
@@ -582,6 +578,12 @@ public final class RecordAccumulator {
transactionManager != null ?
transactionManager.producerIdAndEpoch() : null;
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.hasSequence()) {
+ // If the the producer id/epoch of the partition do
not match the latest one
+ // of the producer, we update it and reset the
sequence. This should be
+ // only done when all its in-flight batches have
completed. This is guarantee
+ // in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
// If the batch already has an assigned sequence, then
we should not change the producer id and
// sequence number, since this may introduce
duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change
the producer id and sequence here, this
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index d77ad41..7e7fb4c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -111,6 +111,15 @@ public class TransactionManager {
return ent;
}
+ private TopicPartitionEntry getOrCreatePartition(TopicPartition
topicPartition) {
+ TopicPartitionEntry ent = topicPartitions.get(topicPartition);
+ if (ent == null) {
+ ent = new TopicPartitionEntry();
+ topicPartitions.put(topicPartition, ent);
+ }
+ return ent;
+ }
+
private void addPartition(TopicPartition topicPartition) {
this.topicPartitions.putIfAbsent(topicPartition, new
TopicPartitionEntry());
}
@@ -146,6 +155,7 @@ public class TransactionManager {
inFlightBatch.resetProducerState(newProducerIdAndEpoch,
sequence.value, inFlightBatch.isTransactional());
sequence.value += inFlightBatch.recordCount;
});
+ topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
topicPartitionEntry.nextSequence = sequence.value;
topicPartitionEntry.lastAckedSequence =
NO_LAST_ACKED_SEQUENCE_NUMBER;
}
@@ -153,6 +163,9 @@ public class TransactionManager {
private static class TopicPartitionEntry {
+ // The producer id/epoch being used for a given partition.
+ private ProducerIdAndEpoch producerIdAndEpoch;
+
// The base sequence of the next batch bound for a given partition.
private int nextSequence;
@@ -171,6 +184,7 @@ public class TransactionManager {
private long lastAckedOffset;
TopicPartitionEntry() {
+ this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.nextSequence = 0;
this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
@@ -512,9 +526,14 @@ public class TransactionManager {
return producerIdAndEpoch;
}
- boolean producerIdOrEpochNotMatch(ProducerBatch batch) {
- ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch;
- return idAndEpoch.producerId != batch.producerId() || idAndEpoch.epoch
!= batch.producerEpoch();
+ synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition
topicPartition) {
+ if (hasStaleProducerIdAndEpoch(topicPartition) &&
!hasInflightBatches(topicPartition)) {
+ // If the batch was on a different ID and/or epoch (due to an
epoch bump) and all its in-flight batches
+ // have completed, reset the partition sequence so that the next
batch (with the new epoch) starts from 0
+ topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
+ log.debug("ProducerId of partition {} set to {} with epoch {}.
Reinitialize sequence at beginning.",
+ topicPartition, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch);
+ }
}
/**
@@ -567,8 +586,8 @@ public class TransactionManager {
this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition,
this.producerIdAndEpoch);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}
-
this.partitionsToRewriteSequences.clear();
+
epochBumpRequired = false;
}
@@ -592,10 +611,14 @@ public class TransactionManager {
* Returns the next sequence number to be written to the given
TopicPartition.
*/
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
- if (!isTransactional())
- topicPartitionBookkeeper.addPartition(topicPartition);
+ return
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence;
+ }
- return
topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
+ /**
+ * Returns the current producer id/epoch of the given TopicPartition.
+ */
+ synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition
topicPartition) {
+ return
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch;
}
synchronized void incrementSequenceNumber(TopicPartition topicPartition,
int increment) {
@@ -685,12 +708,6 @@ public class TransactionManager {
updateLastAckedOffset(response, batch);
removeInFlightBatch(batch);
-
- if (producerIdOrEpochNotMatch(batch) &&
!hasInflightBatches(batch.topicPartition)) {
- // If the batch was on a different ID and/or epoch (due to an
epoch bump) and all its in-flight batches
- // have completed, reset the partition sequence so that the next
batch (with the new epoch) starts from 0
-
topicPartitionBookkeeper.startSequencesAtBeginning(batch.topicPartition,
this.producerIdAndEpoch);
- }
}
private void maybeTransitionToErrorState(RuntimeException exception) {
@@ -718,13 +735,6 @@ public class TransactionManager {
return;
}
- if (producerIdOrEpochNotMatch(batch)) {
- log.debug("Ignoring failed batch {} with producer id {}, epoch {},
and sequence number {} " +
- "since the producerId has been reset internally",
batch, batch.producerId(),
- batch.producerEpoch(), batch.baseSequence(), exception);
- return;
- }
-
if (exception instanceof OutOfOrderSequenceException &&
!isTransactional()) {
log.error("The broker returned {} for topic-partition {} with
producerId {}, epoch {}, and sequence number {}",
exception, batch.topicPartition, batch.producerId(),
batch.producerEpoch(), batch.baseSequence());
@@ -784,8 +794,11 @@ public class TransactionManager {
}
synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
- return topicPartitionBookkeeper.contains(topicPartition)
- &&
!topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.isEmpty();
+ return
!topicPartitionBookkeeper.getOrCreatePartition(topicPartition).inflightBatchesBySequence.isEmpty();
+ }
+
+ synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition
topicPartition) {
+ return
!producerIdAndEpoch.equals(topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch);
}
synchronized boolean hasUnresolvedSequences() {
@@ -998,7 +1011,7 @@ public class TransactionManager {
return true;
} else if
(lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) <
response.logStartOffset) {
// The head of the log has been removed, probably due to the
retention time elapsing. In this case,
- // we expect to lose the producer state. For the transactional
procducer, reset the sequences of all
+ // we expect to lose the producer state. For the transactional
producer, reset the sequences of all
// inflight batches to be from the beginning and retry them,
so that the transaction does not need to
// be aborted. For the idempotent producer, bump the epoch to
avoid reusing (sequence, epoch) pairs
if (isTransactional()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 10deb42..e861cc0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -130,6 +130,7 @@ public class SenderTest {
private static final int MAX_BLOCK_TIMEOUT = 1000;
private static final int REQUEST_TIMEOUT = 1000;
private static final long RETRY_BACKOFF_MS = 50;
+ private static final int DELIVERY_TIMEOUT_MS = 1500;
private static final long TOPIC_IDLE_MS = 60 * 1000;
private TopicPartition tp0 = new TopicPartition("test", 0);
@@ -920,6 +921,232 @@ public class SenderTest {
}
@Test
+ public void
testEpochBumpOnOutOfOrderSequenceForNextBatchWhenThereIsNoBatchInFlight()
throws Exception {
+ // Verify that partitions without in-flight batches when the producer
epoch
+ // is bumped get their sequence number reset correctly.
+ final long producerId = 343434L;
+ TransactionManager transactionManager = createTransactionManager();
+ setupWithTransactionState(transactionManager);
+
+ // Init producer id/epoch
+ prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+ assertEquals(producerId,
transactionManager.producerIdAndEpoch().producerId);
+ assertEquals(0, transactionManager.producerIdAndEpoch().epoch);
+
+ // Partition 0 - Send first batch
+ appendToAccumulator(tp0);
+ sender.runOnce();
+
+ // Partition 0 - State is lazily initialized
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
1, OptionalInt.empty());
+
+ // Partition 0 - Successful response
+ sendIdempotentProducerResponse(0, 0, tp0, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 0 - Last ack is updated
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
1, OptionalInt.of(0));
+
+ // Partition 1 - Send first batch
+ appendToAccumulator(tp1);
+ sender.runOnce();
+
+ // Partition 1 - State is lazily initialized
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
1, OptionalInt.empty());
+
+ // Partition 1 - Successful response
+ sendIdempotentProducerResponse(0, 0, tp1, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 1 - Last ack is updated
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
1, OptionalInt.of(0));
+
+ // Partition 0 - Send second batch
+ appendToAccumulator(tp0);
+ sender.runOnce();
+
+ // Partition 0 - Sequence is incremented
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
2, OptionalInt.of(0));
+
+ // Partition 0 - Failed response with OUT_OF_ORDER_SEQUENCE_NUMBER
+ sendIdempotentProducerResponse(0, 1, tp0,
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1, -1);
+ sender.runOnce(); // Receive
+ sender.runOnce(); // Bump epoch & Retry
+
+ // Producer epoch is bumped
+ assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
+
+ // Partition 0 - State is reset to current producer epoch
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
1, OptionalInt.empty());
+
+ // Partition 1 - State is not changed
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
1, OptionalInt.of(0));
+ assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 0 - Successful Response
+ sendIdempotentProducerResponse(1, 0, tp0, Errors.NONE, 1, -1);
+ sender.runOnce();
+
+ // Partition 0 - Last ack is updated
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
1, OptionalInt.of(0));
+
+ // Partition 1 - Send second batch
+ appendToAccumulator(tp1);
+ sender.runOnce();
+
+ // Partition 1 - Epoch is bumped, sequence is reset and incremented
+ assertPartitionState(transactionManager, tp1, producerId, (short) 1,
1, OptionalInt.empty());
+ assertFalse(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 1 - Successful Response
+ sendIdempotentProducerResponse(1, 0, tp1, Errors.NONE, 1, -1);
+ sender.runOnce();
+
+ // Partition 1 - Last ack is updated
+ assertPartitionState(transactionManager, tp1, producerId, (short) 1,
1, OptionalInt.of(0));
+ }
+
+ @Test
+ public void
testEpochBumpOnOutOfOrderSequenceForNextBatchWhenBatchInFlightFails() throws
Exception {
+ // When a batch failed after the producer epoch is bumped, the
sequence number of
+ // that partition must be reset for any subsequent batches sent.
+ final long producerId = 343434L;
+ TransactionManager transactionManager = createTransactionManager();
+
+ // Retries once
+ setupWithTransactionState(transactionManager, false, null, true, 1);
+
+ // Init producer id/epoch
+ prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+ assertEquals(producerId,
transactionManager.producerIdAndEpoch().producerId);
+ assertEquals(0, transactionManager.producerIdAndEpoch().epoch);
+
+ // Partition 0 - Send first batch
+ appendToAccumulator(tp0);
+ sender.runOnce();
+
+ // Partition 0 - State is lazily initialized
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
1, OptionalInt.empty());
+
+ // Partition 0 - Successful response
+ sendIdempotentProducerResponse(0, 0, tp0, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 0 - Last ack is updated
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
1, OptionalInt.of(0));
+
+ // Partition 1 - Send first batch
+ appendToAccumulator(tp1);
+ sender.runOnce();
+
+ // Partition 1 - State is lazily initialized
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
1, OptionalInt.empty());
+
+ // Partition 1 - Successful response
+ sendIdempotentProducerResponse(0, 0, tp1, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 1 - Last ack is updated
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
1, OptionalInt.of(0));
+
+ // Partition 0 - Send second batch
+ appendToAccumulator(tp0);
+ sender.runOnce();
+
+ // Partition 0 - Sequence is incremented
+ assertPartitionState(transactionManager, tp0, producerId, (short) 0,
2, OptionalInt.of(0));
+
+ // Partition 1 - Send second batch
+ appendToAccumulator(tp1);
+ sender.runOnce();
+
+ // Partition 1 - Sequence is incremented
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
2, OptionalInt.of(0));
+
+ // Partition 0 - Failed response with OUT_OF_ORDER_SEQUENCE_NUMBER
+ sendIdempotentProducerResponse(0, 1, tp0,
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1, -1);
+ sender.runOnce(); // Receive
+ sender.runOnce(); // Bump epoch & Retry
+
+ // Producer epoch is bumped
+ assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
+
+ // Partition 0 - State is reset to current producer epoch
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
1, OptionalInt.empty());
+
+ // Partition 1 - State is not changed. The epoch will be lazily bumped
when all in-flight
+ // batches are completed
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
2, OptionalInt.of(0));
+ assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 1 - Failed response with NOT_LEADER_OR_FOLLOWER
+ sendIdempotentProducerResponse(0, 1, tp1,
Errors.NOT_LEADER_OR_FOLLOWER, -1, -1);
+ sender.runOnce(); // Receive & Retry
+
+ // Partition 1 - State is not changed.
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
2, OptionalInt.of(0));
+ assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 0 - Successful Response
+ sendIdempotentProducerResponse(1, 0, tp0, Errors.NONE, 1, -1);
+ sender.runOnce();
+
+ // Partition 0 - Last ack is updated
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
1, OptionalInt.of(0));
+
+ // Partition 1 - Failed response with NOT_LEADER_OR_FOLLOWER
+ sendIdempotentProducerResponse(0, 1, tp1,
Errors.NOT_LEADER_OR_FOLLOWER, -1, -1);
+ sender.runOnce(); // Receive & Fail the batch (retries exhausted)
+
+ // Partition 1 - State is not changed. It will be lazily updated when
the next batch is sent.
+ assertPartitionState(transactionManager, tp1, producerId, (short) 0,
2, OptionalInt.of(0));
+ assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 1 - Send third batch
+ appendToAccumulator(tp1);
+ sender.runOnce();
+
+ // Partition 1 - Epoch is bumped, sequence is reset
+ assertPartitionState(transactionManager, tp1, producerId, (short) 1,
1, OptionalInt.empty());
+ assertFalse(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+ // Partition 1 - Successful Response
+ sendIdempotentProducerResponse(1, 0, tp1, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 1 - Last ack is updated
+ assertPartitionState(transactionManager, tp1, producerId, (short) 1,
1, OptionalInt.of(0));
+
+ // Partition 0 - Send third batch
+ appendToAccumulator(tp0);
+ sender.runOnce();
+
+ // Partition 0 - Sequence is incremented
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
2, OptionalInt.of(0));
+
+ // Partition 0 - Successful Response
+ sendIdempotentProducerResponse(1, 1, tp0, Errors.NONE, 0, -1);
+ sender.runOnce();
+
+ // Partition 0 - Last ack is updated
+ assertPartitionState(transactionManager, tp0, producerId, (short) 1,
2, OptionalInt.of(1));
+ }
+
+ private void assertPartitionState(
+ TransactionManager transactionManager,
+ TopicPartition tp,
+ long expectedProducerId,
+ short expectedProducerEpoch,
+ long expectedSequenceValue,
+ OptionalInt expectedLastAckedSequence
+ ) {
+ assertEquals("Producer Id:", expectedProducerId,
transactionManager.producerIdAndEpoch(tp).producerId);
+ assertEquals("Producer Epoch:", expectedProducerEpoch,
transactionManager.producerIdAndEpoch(tp).epoch);
+ assertEquals("Seq Number:", expectedSequenceValue,
transactionManager.sequenceNumber(tp).longValue());
+ assertEquals("Last Acked Seq Number:", expectedLastAckedSequence,
transactionManager.lastAckedSequence(tp));
+ }
+
+ @Test
public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
@@ -1326,8 +1553,8 @@ public class SenderTest {
assertTrue(successfulResponse.isDone());
assertEquals(10, successfulResponse.get().offset());
- // Since the response came back for the old producer id, we shouldn't
update the next sequence.
- assertEquals(0, transactionManager.sequenceNumber(tp1).longValue());
+ // The epoch and the sequence are updated when the next batch is sent.
+ assertEquals(1, transactionManager.sequenceNumber(tp1).longValue());
}
@Test
@@ -1438,7 +1665,8 @@ public class SenderTest {
client.respond(produceResponse(tp1, 0, Errors.NONE, -1));
sender.runOnce();
assertTrue(successfulResponse.isDone());
- assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
+ // epoch of partition is bumped and sequence is reset when the next
batch is sent
+ assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
}
@Test
@@ -1801,19 +2029,32 @@ public class SenderTest {
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
}
+
void sendIdempotentProducerResponse(int expectedSequence, TopicPartition
tp, Errors responseError, long responseOffset) {
sendIdempotentProducerResponse(expectedSequence, tp, responseError,
responseOffset, -1L);
}
- void sendIdempotentProducerResponse(final int expectedSequence,
TopicPartition tp, Errors responseError, long responseOffset, long
logStartOffset) {
+ void sendIdempotentProducerResponse(int expectedSequence, TopicPartition
tp, Errors responseError, long responseOffset, long logStartOffset) {
+ sendIdempotentProducerResponse(-1, expectedSequence, tp,
responseError, responseOffset, logStartOffset);
+ }
+
+ void sendIdempotentProducerResponse(
+ int expectedEpoch,
+ int expectedSequence,
+ TopicPartition tp,
+ Errors responseError,
+ long responseOffset,
+ long logStartOffset
+ ) {
client.respond(body -> {
ProduceRequest produceRequest = (ProduceRequest) body;
assertTrue(produceRequest.hasIdempotentRecords());
-
- MemoryRecords records =
produceRequest.partitionRecordsOrFail().get(tp0);
+ MemoryRecords records =
produceRequest.partitionRecordsOrFail().get(tp);
Iterator<MutableRecordBatch> batchIterator =
records.batches().iterator();
RecordBatch firstBatch = batchIterator.next();
assertFalse(batchIterator.hasNext());
+ if (expectedEpoch > -1)
+ assertEquals((short) expectedEpoch,
firstBatch.producerEpoch());
assertEquals(expectedSequence, firstBatch.baseSequence());
return true;
}, produceResponse(tp, responseOffset, responseError, 0,
logStartOffset, null));
@@ -2666,15 +2907,29 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager
transactionManager) {
- setupWithTransactionState(transactionManager, false, null, true);
+ setupWithTransactionState(transactionManager, false, null, true,
Integer.MAX_VALUE);
}
private void setupWithTransactionState(TransactionManager
transactionManager, boolean guaranteeOrder, BufferPool customPool) {
- setupWithTransactionState(transactionManager, guaranteeOrder,
customPool, true);
+ setupWithTransactionState(transactionManager, guaranteeOrder,
customPool, true, Integer.MAX_VALUE);
+ }
+
+ private void setupWithTransactionState(
+ TransactionManager transactionManager,
+ boolean guaranteeOrder,
+ BufferPool customPool,
+ boolean updateMetadata
+ ) {
+ setupWithTransactionState(transactionManager, guaranteeOrder,
customPool, updateMetadata, Integer.MAX_VALUE);
}
- private void setupWithTransactionState(TransactionManager
transactionManager, boolean guaranteeOrder, BufferPool customPool, boolean
updateMetadata) {
- int deliveryTimeoutMs = 1500;
+ private void setupWithTransactionState(
+ TransactionManager transactionManager,
+ boolean guaranteeOrder,
+ BufferPool customPool,
+ boolean updateMetadata,
+ int retries
+ ) {
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new
MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
@@ -2682,10 +2937,10 @@ public class SenderTest {
BufferPool pool = (customPool == null) ? new BufferPool(totalSize,
batchSize, metrics, time, metricGrpName) : customPool;
this.accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, 0, 0L,
- deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions,
transactionManager, pool);
+ DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions,
transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata,
this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
- Integer.MAX_VALUE, this.senderMetricsRegistry, this.time,
REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, transactionManager, apiVersions);
metadata.add("test", time.milliseconds());
if (updateMetadata)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index ee3f892..3464759 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -685,6 +685,7 @@ public class TransactionManagerTest {
Errors.NONE, 500L, time.milliseconds(), 0L);
transactionManager.handleCompletedBatch(b2, b2Response);
+ transactionManager.maybeUpdateProducerIdAndEpoch(tp0);
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertFalse(transactionManager.lastAckedSequence(tp0).isPresent());
assertNull(transactionManager.nextBatchBySequence(tp0));
@@ -693,6 +694,7 @@ public class TransactionManagerTest {
private ProducerBatch writeIdempotentBatchWithValue(TransactionManager
manager,
TopicPartition tp,
String value) {
+ manager.maybeUpdateProducerIdAndEpoch(tp);
int seq = manager.sequenceNumber(tp);
manager.incrementSequenceNumber(tp, 1);
ProducerBatch batch = batchWithValue(tp, value);
@@ -3117,6 +3119,7 @@ public class TransactionManagerTest {
tp1b2.done(500L, b1AppendTime, null);
transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
+ transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
@@ -3131,6 +3134,7 @@ public class TransactionManagerTest {
tp1b3.done(500L, b1AppendTime, null);
transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
+ transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
}
@@ -3239,6 +3243,7 @@ public class TransactionManagerTest {
tp1b2.done(500L, b1AppendTime, null);
transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
+ transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());