This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 4e814c7  KAFKA-12152; Idempotent Producer does not reset the sequence 
number of partitions without in-flight batches (#9832)
4e814c7 is described below

commit 4e814c75bf85d365004d901c7aef2e9a1780cc7d
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 21 09:38:19 2021 +0100

    KAFKA-12152; Idempotent Producer does not reset the sequence number of 
partitions without in-flight batches (#9832)
    
    When a `OutOfOrderSequenceException` error is received by an idempotent 
producer for a partition, the producer bumps its epoch, adjusts the sequence 
number and the epoch of the in-flight batches of the partitions affected by the 
`OutOfOrderSequenceException` error. This happens in 
`TransactionManager#bumpIdempotentProducerEpoch`.
    
    The remaining partitions are treated separately. When the last in-flight 
batch of a given partition is completed, the sequence number is reset. This 
happens in `TransactionManager#handleCompletedBatch`.
    
    However, when a given partition does not have in-flight batches when the 
producer epoch is bumped, its sequence number is not reset. Similarly, when a 
in-flight batch eventually fails after the producer epoch is bumped, the batch 
is discarded and sequence number for its partition is not reset. In both cases, 
it results in having subsequent producer request to use the new producer epoch 
with the old sequence number and to be rejected by the broker.
    
    With this patch, the producer id/epoch is now stored in the partition 
state. This ensure that the producer id/epoch of a given partition remains 
consistent with its sequence number. When the producer epoch is bumped, the 
producer epoch of the partition is lazily updated and its sequence number is 
reset accordingly. This happens only when all the in-flight batches have been 
resolved.
    
    Reviewers: Bob Barrett <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../producer/internals/RecordAccumulator.java      |  14 +-
 .../producer/internals/TransactionManager.java     |  59 +++--
 .../clients/producer/internals/SenderTest.java     | 279 ++++++++++++++++++++-
 .../producer/internals/TransactionManagerTest.java |   5 +
 4 files changed, 316 insertions(+), 41 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 3781297..efccf30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -508,15 +508,11 @@ public final class RecordAccumulator {
                 return true;
 
             if (!first.hasSequence()) {
-                if (transactionManager.hasInflightBatches(tp)) {
+                if (transactionManager.hasInflightBatches(tp) && 
transactionManager.hasStaleProducerIdAndEpoch(tp)) {
                     // Don't drain any new batches while the partition has 
in-flight batches with a different epoch
                     // and/or producer ID. Otherwise, a batch with a new epoch 
and sequence number
                     // 0 could be written before earlier batches complete, 
which would cause out of sequence errors
-                    ProducerBatch firstInFlightBatch = 
transactionManager.nextBatchBySequence(tp);
-
-                    if (firstInFlightBatch != null && 
transactionManager.producerIdOrEpochNotMatch(firstInFlightBatch)) {
-                        return true;
-                    }
+                    return true;
                 }
 
                 if 
(transactionManager.hasUnresolvedSequence(first.topicPartition))
@@ -582,6 +578,12 @@ public final class RecordAccumulator {
                         transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
                     ProducerBatch batch = deque.pollFirst();
                     if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                        // If the the producer id/epoch of the partition do 
not match the latest one
+                        // of the producer, we update it and reset the 
sequence. This should be
+                        // only done when all its in-flight batches have 
completed. This is guarantee
+                        // in `shouldStopDrainBatchesForPartition`.
+                        
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
                         // If the batch already has an assigned sequence, then 
we should not change the producer id and
                         // sequence number, since this may introduce 
duplicates. In particular, the previous attempt
                         // may actually have been accepted, and if we change 
the producer id and sequence here, this
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index d77ad41..7e7fb4c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -111,6 +111,15 @@ public class TransactionManager {
             return ent;
         }
 
+        private TopicPartitionEntry getOrCreatePartition(TopicPartition 
topicPartition) {
+            TopicPartitionEntry ent = topicPartitions.get(topicPartition);
+            if (ent == null) {
+                ent = new TopicPartitionEntry();
+                topicPartitions.put(topicPartition, ent);
+            }
+            return ent;
+        }
+
         private void addPartition(TopicPartition topicPartition) {
             this.topicPartitions.putIfAbsent(topicPartition, new 
TopicPartitionEntry());
         }
@@ -146,6 +155,7 @@ public class TransactionManager {
                 inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value, inFlightBatch.isTransactional());
                 sequence.value += inFlightBatch.recordCount;
             });
+            topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
             topicPartitionEntry.nextSequence = sequence.value;
             topicPartitionEntry.lastAckedSequence = 
NO_LAST_ACKED_SEQUENCE_NUMBER;
         }
@@ -153,6 +163,9 @@ public class TransactionManager {
 
     private static class TopicPartitionEntry {
 
+        // The producer id/epoch being used for a given partition.
+        private ProducerIdAndEpoch producerIdAndEpoch;
+
         // The base sequence of the next batch bound for a given partition.
         private int nextSequence;
 
@@ -171,6 +184,7 @@ public class TransactionManager {
         private long lastAckedOffset;
 
         TopicPartitionEntry() {
+            this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
             this.nextSequence = 0;
             this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
             this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
@@ -512,9 +526,14 @@ public class TransactionManager {
         return producerIdAndEpoch;
     }
 
-    boolean producerIdOrEpochNotMatch(ProducerBatch batch) {
-        ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch;
-        return idAndEpoch.producerId != batch.producerId() || idAndEpoch.epoch 
!= batch.producerEpoch();
+    synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition 
topicPartition) {
+        if (hasStaleProducerIdAndEpoch(topicPartition) && 
!hasInflightBatches(topicPartition)) {
+            // If the batch was on a different ID and/or epoch (due to an 
epoch bump) and all its in-flight batches
+            // have completed, reset the partition sequence so that the next 
batch (with the new epoch) starts from 0
+            topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
+            log.debug("ProducerId of partition {} set to {} with epoch {}. 
Reinitialize sequence at beginning.",
+                      topicPartition, producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch);
+        }
     }
 
     /**
@@ -567,8 +586,8 @@ public class TransactionManager {
             
this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
             this.partitionsWithUnresolvedSequences.remove(topicPartition);
         }
-
         this.partitionsToRewriteSequences.clear();
+
         epochBumpRequired = false;
     }
 
@@ -592,10 +611,14 @@ public class TransactionManager {
      * Returns the next sequence number to be written to the given 
TopicPartition.
      */
     synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        if (!isTransactional())
-            topicPartitionBookkeeper.addPartition(topicPartition);
+        return 
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence;
+    }
 
-        return 
topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
+    /**
+     * Returns the current producer id/epoch of the given TopicPartition.
+     */
+    synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition 
topicPartition) {
+        return 
topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch;
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, 
int increment) {
@@ -685,12 +708,6 @@ public class TransactionManager {
 
         updateLastAckedOffset(response, batch);
         removeInFlightBatch(batch);
-
-        if (producerIdOrEpochNotMatch(batch) && 
!hasInflightBatches(batch.topicPartition)) {
-            // If the batch was on a different ID and/or epoch (due to an 
epoch bump) and all its in-flight batches
-            // have completed, reset the partition sequence so that the next 
batch (with the new epoch) starts from 0
-            
topicPartitionBookkeeper.startSequencesAtBeginning(batch.topicPartition, 
this.producerIdAndEpoch);
-        }
     }
 
     private void maybeTransitionToErrorState(RuntimeException exception) {
@@ -718,13 +735,6 @@ public class TransactionManager {
             return;
         }
 
-        if (producerIdOrEpochNotMatch(batch)) {
-            log.debug("Ignoring failed batch {} with producer id {}, epoch {}, 
and sequence number {} " +
-                            "since the producerId has been reset internally", 
batch, batch.producerId(),
-                    batch.producerEpoch(), batch.baseSequence(), exception);
-            return;
-        }
-
         if (exception instanceof OutOfOrderSequenceException && 
!isTransactional()) {
             log.error("The broker returned {} for topic-partition {} with 
producerId {}, epoch {}, and sequence number {}",
                     exception, batch.topicPartition, batch.producerId(), 
batch.producerEpoch(), batch.baseSequence());
@@ -784,8 +794,11 @@ public class TransactionManager {
     }
 
     synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
-        return topicPartitionBookkeeper.contains(topicPartition)
-                && 
!topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.isEmpty();
+        return 
!topicPartitionBookkeeper.getOrCreatePartition(topicPartition).inflightBatchesBySequence.isEmpty();
+    }
+
+    synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition 
topicPartition) {
+        return 
!producerIdAndEpoch.equals(topicPartitionBookkeeper.getOrCreatePartition(topicPartition).producerIdAndEpoch);
     }
 
     synchronized boolean hasUnresolvedSequences() {
@@ -998,7 +1011,7 @@ public class TransactionManager {
                 return true;
             } else if 
(lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < 
response.logStartOffset) {
                 // The head of the log has been removed, probably due to the 
retention time elapsing. In this case,
-                // we expect to lose the producer state. For the transactional 
procducer, reset the sequences of all
+                // we expect to lose the producer state. For the transactional 
producer, reset the sequences of all
                 // inflight batches to be from the beginning and retry them, 
so that the transaction does not need to
                 // be aborted. For the idempotent producer, bump the epoch to 
avoid reusing (sequence, epoch) pairs
                 if (isTransactional()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 10deb42..e861cc0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -130,6 +130,7 @@ public class SenderTest {
     private static final int MAX_BLOCK_TIMEOUT = 1000;
     private static final int REQUEST_TIMEOUT = 1000;
     private static final long RETRY_BACKOFF_MS = 50;
+    private static final int DELIVERY_TIMEOUT_MS = 1500;
     private static final long TOPIC_IDLE_MS = 60 * 1000;
 
     private TopicPartition tp0 = new TopicPartition("test", 0);
@@ -920,6 +921,232 @@ public class SenderTest {
     }
 
     @Test
+    public void 
testEpochBumpOnOutOfOrderSequenceForNextBatchWhenThereIsNoBatchInFlight() 
throws Exception {
+        // Verify that partitions without in-flight batches when the producer 
epoch
+        // is bumped get their sequence number reset correctly.
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+        setupWithTransactionState(transactionManager);
+
+        // Init producer id/epoch
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertEquals(producerId, 
transactionManager.producerIdAndEpoch().producerId);
+        assertEquals(0, transactionManager.producerIdAndEpoch().epoch);
+
+        // Partition 0 - Send first batch
+        appendToAccumulator(tp0);
+        sender.runOnce();
+
+        // Partition 0 - State is lazily initialized
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
1, OptionalInt.empty());
+
+        // Partition 0 - Successful response
+        sendIdempotentProducerResponse(0, 0, tp0, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 0 - Last ack is updated
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
1, OptionalInt.of(0));
+
+        // Partition 1 - Send first batch
+        appendToAccumulator(tp1);
+        sender.runOnce();
+
+        // Partition 1 - State is lazily initialized
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
1, OptionalInt.empty());
+
+        // Partition 1 - Successful response
+        sendIdempotentProducerResponse(0, 0, tp1, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 1 - Last ack is updated
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
1, OptionalInt.of(0));
+
+        // Partition 0 - Send second batch
+        appendToAccumulator(tp0);
+        sender.runOnce();
+
+        // Partition 0 - Sequence is incremented
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
2, OptionalInt.of(0));
+
+        // Partition 0 - Failed response with OUT_OF_ORDER_SEQUENCE_NUMBER
+        sendIdempotentProducerResponse(0, 1, tp0, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1, -1);
+        sender.runOnce(); // Receive
+        sender.runOnce(); // Bump epoch & Retry
+
+        // Producer epoch is bumped
+        assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
+
+        // Partition 0 - State is reset to current producer epoch
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
1, OptionalInt.empty());
+
+        // Partition 1 - State is not changed
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
1, OptionalInt.of(0));
+        assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 0 - Successful Response
+        sendIdempotentProducerResponse(1, 0, tp0, Errors.NONE, 1, -1);
+        sender.runOnce();
+
+        // Partition 0 - Last ack is updated
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
1, OptionalInt.of(0));
+
+        // Partition 1 - Send second batch
+        appendToAccumulator(tp1);
+        sender.runOnce();
+
+        // Partition 1 - Epoch is bumped, sequence is reset and incremented
+        assertPartitionState(transactionManager, tp1, producerId, (short) 1, 
1, OptionalInt.empty());
+        assertFalse(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 1 - Successful Response
+        sendIdempotentProducerResponse(1, 0, tp1, Errors.NONE, 1, -1);
+        sender.runOnce();
+
+        // Partition 1 - Last ack is updated
+        assertPartitionState(transactionManager, tp1, producerId, (short) 1, 
1, OptionalInt.of(0));
+    }
+
+    @Test
+    public void 
testEpochBumpOnOutOfOrderSequenceForNextBatchWhenBatchInFlightFails() throws 
Exception {
+        // When a batch failed after the producer epoch is bumped, the 
sequence number of
+        // that partition must be reset for any subsequent batches sent.
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+
+        // Retries once
+        setupWithTransactionState(transactionManager, false, null, true, 1);
+
+        // Init producer id/epoch
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertEquals(producerId, 
transactionManager.producerIdAndEpoch().producerId);
+        assertEquals(0, transactionManager.producerIdAndEpoch().epoch);
+
+        // Partition 0 - Send first batch
+        appendToAccumulator(tp0);
+        sender.runOnce();
+
+        // Partition 0 - State is lazily initialized
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
1, OptionalInt.empty());
+
+        // Partition 0 - Successful response
+        sendIdempotentProducerResponse(0, 0, tp0, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 0 - Last ack is updated
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
1, OptionalInt.of(0));
+
+        // Partition 1 - Send first batch
+        appendToAccumulator(tp1);
+        sender.runOnce();
+
+        // Partition 1 - State is lazily initialized
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
1, OptionalInt.empty());
+
+        // Partition 1 - Successful response
+        sendIdempotentProducerResponse(0, 0, tp1, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 1 - Last ack is updated
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
1, OptionalInt.of(0));
+
+        // Partition 0 - Send second batch
+        appendToAccumulator(tp0);
+        sender.runOnce();
+
+        // Partition 0 - Sequence is incremented
+        assertPartitionState(transactionManager, tp0, producerId, (short) 0, 
2, OptionalInt.of(0));
+
+        // Partition 1 - Send second batch
+        appendToAccumulator(tp1);
+        sender.runOnce();
+
+        // Partition 1 - Sequence is incremented
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
2, OptionalInt.of(0));
+
+        // Partition 0 - Failed response with OUT_OF_ORDER_SEQUENCE_NUMBER
+        sendIdempotentProducerResponse(0, 1, tp0, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1, -1);
+        sender.runOnce(); // Receive
+        sender.runOnce(); // Bump epoch & Retry
+
+        // Producer epoch is bumped
+        assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
+
+        // Partition 0 - State is reset to current producer epoch
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
1, OptionalInt.empty());
+
+        // Partition 1 - State is not changed. The epoch will be lazily bumped 
when all in-flight
+        // batches are completed
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
2, OptionalInt.of(0));
+        assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 1 - Failed response with NOT_LEADER_OR_FOLLOWER
+        sendIdempotentProducerResponse(0, 1, tp1, 
Errors.NOT_LEADER_OR_FOLLOWER, -1, -1);
+        sender.runOnce(); // Receive & Retry
+
+        // Partition 1 - State is not changed.
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
2, OptionalInt.of(0));
+        assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 0 - Successful Response
+        sendIdempotentProducerResponse(1, 0, tp0, Errors.NONE, 1, -1);
+        sender.runOnce();
+
+        // Partition 0 - Last ack is updated
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
1, OptionalInt.of(0));
+
+        // Partition 1 - Failed response with NOT_LEADER_OR_FOLLOWER
+        sendIdempotentProducerResponse(0, 1, tp1, 
Errors.NOT_LEADER_OR_FOLLOWER, -1, -1);
+        sender.runOnce(); // Receive & Fail the batch (retries exhausted)
+
+        // Partition 1 - State is not changed. It will be lazily updated when 
the next batch is sent.
+        assertPartitionState(transactionManager, tp1, producerId, (short) 0, 
2, OptionalInt.of(0));
+        assertTrue(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 1 - Send third batch
+        appendToAccumulator(tp1);
+        sender.runOnce();
+
+        // Partition 1 - Epoch is bumped, sequence is reset
+        assertPartitionState(transactionManager, tp1, producerId, (short) 1, 
1, OptionalInt.empty());
+        assertFalse(transactionManager.hasStaleProducerIdAndEpoch(tp1));
+
+        // Partition 1 - Successful Response
+        sendIdempotentProducerResponse(1, 0, tp1, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 1 - Last ack is updated
+        assertPartitionState(transactionManager, tp1, producerId, (short) 1, 
1, OptionalInt.of(0));
+
+        // Partition 0 - Send third batch
+        appendToAccumulator(tp0);
+        sender.runOnce();
+
+        // Partition 0 - Sequence is incremented
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
2, OptionalInt.of(0));
+
+        // Partition 0 - Successful Response
+        sendIdempotentProducerResponse(1, 1, tp0, Errors.NONE, 0, -1);
+        sender.runOnce();
+
+        // Partition 0 - Last ack is updated
+        assertPartitionState(transactionManager, tp0, producerId, (short) 1, 
2, OptionalInt.of(1));
+    }
+
+    private void assertPartitionState(
+        TransactionManager transactionManager,
+        TopicPartition tp,
+        long expectedProducerId,
+        short expectedProducerEpoch,
+        long expectedSequenceValue,
+        OptionalInt expectedLastAckedSequence
+    ) {
+        assertEquals("Producer Id:", expectedProducerId, 
transactionManager.producerIdAndEpoch(tp).producerId);
+        assertEquals("Producer Epoch:", expectedProducerEpoch, 
transactionManager.producerIdAndEpoch(tp).epoch);
+        assertEquals("Seq Number:", expectedSequenceValue, 
transactionManager.sequenceNumber(tp).longValue());
+        assertEquals("Last Acked Seq Number:", expectedLastAckedSequence, 
transactionManager.lastAckedSequence(tp));
+    }
+
+    @Test
     public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = createTransactionManager();
@@ -1326,8 +1553,8 @@ public class SenderTest {
         assertTrue(successfulResponse.isDone());
         assertEquals(10, successfulResponse.get().offset());
 
-        // Since the response came back for the old producer id, we shouldn't 
update the next sequence.
-        assertEquals(0, transactionManager.sequenceNumber(tp1).longValue());
+        // The epoch and the sequence are updated when the next batch is sent.
+        assertEquals(1, transactionManager.sequenceNumber(tp1).longValue());
     }
 
     @Test
@@ -1438,7 +1665,8 @@ public class SenderTest {
         client.respond(produceResponse(tp1, 0, Errors.NONE, -1));
         sender.runOnce();
         assertTrue(successfulResponse.isDone());
-        assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
+        // epoch of partition is bumped and sequence is reset when the next 
batch is sent
+        assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
     }
 
     @Test
@@ -1801,19 +2029,32 @@ public class SenderTest {
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request2.isDone());
     }
+
     void sendIdempotentProducerResponse(int expectedSequence, TopicPartition 
tp, Errors responseError, long responseOffset) {
         sendIdempotentProducerResponse(expectedSequence, tp, responseError, 
responseOffset, -1L);
     }
 
-    void sendIdempotentProducerResponse(final int expectedSequence, 
TopicPartition tp, Errors responseError, long responseOffset, long 
logStartOffset) {
+    void sendIdempotentProducerResponse(int expectedSequence, TopicPartition 
tp, Errors responseError, long responseOffset, long logStartOffset) {
+        sendIdempotentProducerResponse(-1, expectedSequence, tp, 
responseError, responseOffset, logStartOffset);
+    }
+
+    void sendIdempotentProducerResponse(
+        int expectedEpoch,
+        int expectedSequence,
+        TopicPartition tp,
+        Errors responseError,
+        long responseOffset,
+        long logStartOffset
+    ) {
         client.respond(body -> {
             ProduceRequest produceRequest = (ProduceRequest) body;
             assertTrue(produceRequest.hasIdempotentRecords());
-
-            MemoryRecords records = 
produceRequest.partitionRecordsOrFail().get(tp0);
+            MemoryRecords records = 
produceRequest.partitionRecordsOrFail().get(tp);
             Iterator<MutableRecordBatch> batchIterator = 
records.batches().iterator();
             RecordBatch firstBatch = batchIterator.next();
             assertFalse(batchIterator.hasNext());
+            if (expectedEpoch > -1)
+                assertEquals((short) expectedEpoch, 
firstBatch.producerEpoch());
             assertEquals(expectedSequence, firstBatch.baseSequence());
             return true;
         }, produceResponse(tp, responseOffset, responseError, 0, 
logStartOffset, null));
@@ -2666,15 +2907,29 @@ public class SenderTest {
     }
     
     private void setupWithTransactionState(TransactionManager 
transactionManager) {
-        setupWithTransactionState(transactionManager, false, null, true);
+        setupWithTransactionState(transactionManager, false, null, true, 
Integer.MAX_VALUE);
     }
 
     private void setupWithTransactionState(TransactionManager 
transactionManager, boolean guaranteeOrder, BufferPool customPool) {
-        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, true);
+        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, true, Integer.MAX_VALUE);
+    }
+
+    private void setupWithTransactionState(
+        TransactionManager transactionManager,
+        boolean guaranteeOrder,
+        BufferPool customPool,
+        boolean updateMetadata
+    ) {
+        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, updateMetadata, Integer.MAX_VALUE);
     }
 
-    private void setupWithTransactionState(TransactionManager 
transactionManager, boolean guaranteeOrder, BufferPool customPool, boolean 
updateMetadata) {
-        int deliveryTimeoutMs = 1500;
+    private void setupWithTransactionState(
+        TransactionManager transactionManager,
+        boolean guaranteeOrder,
+        BufferPool customPool,
+        boolean updateMetadata,
+        int retries
+    ) {
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
         MetricConfig metricConfig = new 
MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
@@ -2682,10 +2937,10 @@ public class SenderTest {
         BufferPool pool = (customPool == null) ? new BufferPool(totalSize, 
batchSize, metrics, time, metricGrpName) : customPool;
 
         this.accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, 0, 0L,
-                deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, 
transactionManager, pool);
+            DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, 
transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
-                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, 
REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+            retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, transactionManager, apiVersions);
 
         metadata.add("test", time.milliseconds());
         if (updateMetadata)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index ee3f892..3464759 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -685,6 +685,7 @@ public class TransactionManagerTest {
                 Errors.NONE, 500L, time.milliseconds(), 0L);
         transactionManager.handleCompletedBatch(b2, b2Response);
 
+        transactionManager.maybeUpdateProducerIdAndEpoch(tp0);
         assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
         assertFalse(transactionManager.lastAckedSequence(tp0).isPresent());
         assertNull(transactionManager.nextBatchBySequence(tp0));
@@ -693,6 +694,7 @@ public class TransactionManagerTest {
     private ProducerBatch writeIdempotentBatchWithValue(TransactionManager 
manager,
                                                         TopicPartition tp,
                                                         String value) {
+        manager.maybeUpdateProducerIdAndEpoch(tp);
         int seq = manager.sequenceNumber(tp);
         manager.incrementSequenceNumber(tp, 1);
         ProducerBatch batch = batchWithValue(tp, value);
@@ -3117,6 +3119,7 @@ public class TransactionManagerTest {
         tp1b2.done(500L, b1AppendTime, null);
         transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
 
+        transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
         assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
 
@@ -3131,6 +3134,7 @@ public class TransactionManagerTest {
         tp1b3.done(500L, b1AppendTime, null);
         transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
 
+        transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
         assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
     }
@@ -3239,6 +3243,7 @@ public class TransactionManagerTest {
         tp1b2.done(500L, b1AppendTime, null);
         transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
 
+        transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
         assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
 

Reply via email to