wcarlson5 commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1346128343


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,42 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /**
+     * It will update the leader to which this batch will be produced for the 
ongoing attempt, if a newer leader is known.
+     * @param latestLeaderEpoch latest leader's epoch.
+     */
+    void maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+        if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+            log.trace("For {}, leader will be updated, currentLeaderEpoch: {}, 
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+                this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, 
latestLeaderEpoch, attempts);
+            attemptsWhenLeaderLastChanged = attempts();
+            currentLeaderEpoch = latestLeaderEpoch;
+        } else {
+            log.trace("For {}, leader wasn't updated, currentLeaderEpoch: {}, 
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+                this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, 
latestLeaderEpoch, attempts);
+        }
+    }
+
+    /**
+     * It will return true, for a when batch is being retried, it will be 
retried to a newer leader.
+     */
+
+    boolean hasLeaderChangedForTheOngoingRetry() {

Review Comment:
   love this, thanks!



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
             time.sleep(10);
 
             // We should have one batch ready.
-            Set<Node> nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+            Set<Node> nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
             assertEquals(1, nodes.size(), "Should have 1 leader ready");
-            List<ProducerBatch> batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
             assertEquals(1, batches.size(), "Should have 1 batch ready");
             int actualBatchSize = batches.get(0).records().sizeInBytes();
             assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
             assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
         }
     }
 
+    /**
+     * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+     * 1. how long it has waited between retry attempts.
+     * 2. change in leader hosting the partition.
+     */
+    @Test
+    public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {
+        int part1LeaderEpoch = 100;
+        // Create cluster metadata, partition1 being hosted by node1.
+        part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+        cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+            Collections.emptySet(), Collections.emptySet());
+        final int finalEpoch = part1LeaderEpoch;
+        metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+        int batchSize = 10;
+        int lingerMs = 10;
+        int retryBackoffMs = 100;
+        int retryBackoffMaxMs = 1000;
+        int deliveryTimeoutMs = Integer.MAX_VALUE;
+        long totalSize = 10 * 1024;
+        String metricGrpName = "producer-metrics";
+        final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
+            CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+            deliveryTimeoutMs, metrics, metricGrpName, time, new 
ApiVersions(), null,
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
+
+        // Create 1 batch(batchA) to be produced to partition1.
+        long now = time.milliseconds();
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, now, cluster);
+
+        // 1st attempt to produce batchA, it should be ready & drained to be 
produced.
+        {
+            log.info("Running 1st attempt to produce batchA, it should be 
ready & drained to be produced.");
+            now += lingerMs + 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+            // Re-enqueue batch for subsequent retries & test-cases
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and no leader change, so should backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and no leader change, so should backoff.");
+            now += 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertFalse(result.readyNodes.contains(node1), "Node1 is not 
ready");
+
+            // Try to drain from node1, it should return no batches.
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */, 
now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).isEmpty(),
+                "No batches ready to be drained on Node1");
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and leader has changed, so should not backoff.");
+            now += 1;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and no leader change, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and no leader change, so should not backoff");
+            now += 2 * retryBackoffMaxMs;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and leader has changed, so should not backoff.");
+            now += 2 * retryBackoffMaxMs;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(3, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+    }
+
+    @Test
+    public void testDrainWithANodeThatDoesntHostAnyPartitions() {

Review Comment:
   Awesome, that answer my question perfectly 



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
         txnManager.beginTransaction();
     }
+    @Test
+    public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        try {
+            // SETUP
+            String metricGrpName = "producer-metrics-test-stats-1";
+            long totalSize = 1024 * 1024;
+            BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+                metricGrpName);
+            long retryBackoffMaxMs = 100L;

Review Comment:
   That's good to know. My concern came from `time.sleep(2 * 
retryBackoffMaxMs);` down on line 3223. I'm not sure that is avoidable though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to