msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1340370371


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -156,6 +157,8 @@ public class SenderTest {
     private SenderMetricsRegistry senderMetricsRegistry = null;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Inclined to keep it since helps improve readability of test-logs when test 
fails. By default logging is off, see similar comment below for details.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -96,6 +98,8 @@ public class RecordAccumulatorTest {
     private final long maxBlockTimeMs = 1000;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(RecordAccumulatorTest.class);

Review Comment:
   I am inclined to keep it, as it helps improve readability of test logs 
if/when test fails. 
   By default, the logging is turned off in 
`clients/src/resources/log4.properties`



##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -20,6 +20,7 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+    public static final int UNKNOWN_LEADER_EPOCH = -1;

Review Comment:
   @dajac this is removed now, thanks!



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   > least one attempt
   
   implies a retry.
   
   > the above epoch comparison is false
   
   So leader-epoch is still the same. 
   
   Consider leader was changed at attempt=5, to epoch=100. 
maybeUpdateLeaderEpoch() should detect leader change even when called again at 
attempt=5, with the same epoch=100. As this is the same attempt in which the 
leader change was detected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to