msn-tldr commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1340370371
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -156,6 +157,8 @@ public class SenderTest { private SenderMetricsRegistry senderMetricsRegistry = null; private final LogContext logContext = new LogContext(); + private final Logger log = logContext.logger(SenderTest.class); Review Comment: Inclined to keep it since helps improve readability of test-logs when test fails. By default logging is off, see similar comment below for details. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -96,6 +98,8 @@ public class RecordAccumulatorTest { private final long maxBlockTimeMs = 1000; private final LogContext logContext = new LogContext(); + private final Logger log = logContext.logger(RecordAccumulatorTest.class); Review Comment: I am inclined to keep it, as it helps improve readability of test logs if/when test fails. By default, the logging is turned off in `clients/src/resources/log4.properties` ########## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ########## @@ -20,6 +20,7 @@ * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { + public static final int UNKNOWN_LEADER_EPOCH = -1; Review Comment: @dajac this is removed now, thanks! ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", + this, currentLeaderEpoch, leaderChangedAttempts, latestLeaderEpoch, attempts()); + if (attempts() >= 1) { + // If the leader's epoch has changed, this counts as a leader change + if (currentLeaderEpoch != latestLeaderEpoch) { + leaderChangedAttempts = attempts(); + leaderChanged = true; + } else { + // Otherwise, it's only a leader change until the first attempt is made with this leader Review Comment: > least one attempt implies a retry. > the above epoch comparison is false So leader-epoch is still the same. Consider leader was changed at attempt=5, to epoch=100. maybeUpdateLeaderEpoch() should detect leader change even when called again at attempt=5, with the same epoch=100. As this is the same attempt in which the leader change was detected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org