kirktrue commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1344502781


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+        if (!latestLeaderEpoch.isPresent())
+            return false;
+
+        boolean leaderChanged = false;
+        int attempts = attempts();
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch: 
{}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: 
{}",
+            this, currentLeaderEpoch.isPresent() ? currentLeaderEpoch.get() : 
"un-initialized", attemptsWhenLeaderLastChanged, latestLeaderEpoch.get(), 
attempts);
+        boolean isRetry = attempts >= 1;
+        // Checking for leader change makes sense only from 1st retry 
onwards(i.e attempt >=1).
+        if (isRetry) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+                attemptsWhenLeaderLastChanged = attempts;
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader
+                leaderChanged = attempts == attemptsWhenLeaderLastChanged;
+            }
+        }
+        if (leaderChanged) {
+            log.debug("For {}, leader has changed, oldEpoch: {}, newEpoch: {}",

Review Comment:
   nit: per @wcarlson5's [comment 
above](https://github.com/apache/kafka/pull/14384/files#r1344314253), the log 
message might be read that the internal state of the batch was _changed_, 
rather than simply _detected_. I don't have suggestions on how to reword 
though. It's fine if you want to leave it as is.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -842,22 +845,31 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
         return false;
     }
 
-    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node 
node, int maxSize, long now) {
+    private List<ProducerBatch> drainBatchesForOneNode(Metadata metadata, Node 
node, int maxSize, long now) {
         int size = 0;
-        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+        List<PartitionInfo> parts = 
metadata.fetch().partitionsForNode(node.id());
         List<ProducerBatch> ready = new ArrayList<>();
+        if (parts.isEmpty())
+            return ready;
         /* to make starvation less likely each node has it's own drainIndex */
         int drainIndex = getDrainIndex(node.idString());
         int start = drainIndex = drainIndex % parts.size();
         do {
             PartitionInfo part = parts.get(drainIndex);
+
             TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
             updateDrainIndex(node.idString(), drainIndex);
             drainIndex = (drainIndex + 1) % parts.size();
             // Only proceed if the partition has no in-flight batches.
             if (isMuted(tp))
                 continue;
-
+            Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(new TopicPartition(part.topic(), part.partition()));

Review Comment:
   nit: can we reuse `tp` instead of creating a new `TopicPartition`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   I guess I was under the false impression that `maybeUpdateLeaderEpoch()` was 
only called _once_ per attempt. I _think_ I can see how the flow of the code 
would end up calling that multiple times.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {

Review Comment:
   It does assign the `currentLeaderEpoch` at the end, though, right?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -682,8 +683,9 @@ private long partitionReady(Cluster cluster, long nowMs, 
String topic,
             TopicPartition part = new TopicPartition(topic, entry.getKey());
             // Advance queueSizesIndex so that we properly index available
             // partitions.  Do it here so that it's done for all code paths.
-            Node leader = cluster.leaderFor(part);
-            if (leader != null && queueSizes != null) {
+
+            Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(part);

Review Comment:
   ```suggestion
               Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(part);
               Node leader = leaderAndEpoch.orElse(null);
   ```
   
   Or something like that?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -682,8 +683,9 @@ private long partitionReady(Cluster cluster, long nowMs, 
String topic,
             TopicPartition part = new TopicPartition(topic, entry.getKey());
             // Advance queueSizesIndex so that we properly index available
             // partitions.  Do it here so that it's done for all code paths.
-            Node leader = cluster.leaderFor(part);
-            if (leader != null && queueSizes != null) {
+
+            Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(part);

Review Comment:
   Since the `leaderAndEpoch` variable isn't reassigned in this method, can we 
grab a reference to the leader `Node` as we had before? That way we don't have 
quite so many places that we have the more verbose 
`leaderAndEpoch.leader.isPresent` and `leaderAndEpoch.leader.get()` everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to