soarez commented on code in PR #15810:
URL: https://github.com/apache/kafka/pull/15810#discussion_r1581762636


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) {
     }
 
     /**
-     * Trigger a leader epoch bump if one is needed.
-     *
-     * We need to bump the leader epoch if:
-     * 1. The leader changed, or
-     * 2. The new replica list does not contain all the nodes that the old 
replica list did.
-     *
-     * Changes that do NOT fall in any of these categories will increase the 
partition epoch, but
-     * not the leader epoch. Note that if the leader epoch increases, the 
partition epoch will
-     * always increase as well; there is no case where the partition epoch 
increases more slowly
-     * than the leader epoch.
+     * Trigger a leader epoch bump if one is needed because of replica 
reassignment.
      *
-     * If the PartitionChangeRecord sets the leader field to something other 
than
-     * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That 
takes care of
-     * case 1. In this function, we check for cases 2 and 3, and handle them 
by manually
-     * setting record.leader to the current leader.
-     *
-     * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica 
manager
-     * that required that the leader epoch be bump whenever the ISR shrank. In 
MV 3.6 this leader
-     * bump is not required when the ISR shrinks. Note, that the leader epoch 
is never increased if
-     * the ISR expanded.
+     * Note that if the leader epoch increases, the partition epoch will 
always increase as well; there is no
+     * case where the partition epoch increases more slowly than the leader 
epoch.
+     */
+    void 
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord 
record) {

Review Comment:
   This issue predates the PR, but I want to say it anyway: It's not great that 
both these method are named and documented in a way that conveys a leader epoch 
bump but what they do is merely set the leader value in the change record. One 
needs to know about `PartitionRegistration#merge(PartitionChangeRecord)` to 
make the link between setting the leader on the PartitionChangeRecord and 
provoking a leader epoch bump.  



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) {
     }
 
     /**
-     * Trigger a leader epoch bump if one is needed.
-     *
-     * We need to bump the leader epoch if:
-     * 1. The leader changed, or
-     * 2. The new replica list does not contain all the nodes that the old 
replica list did.
-     *
-     * Changes that do NOT fall in any of these categories will increase the 
partition epoch, but
-     * not the leader epoch. Note that if the leader epoch increases, the 
partition epoch will
-     * always increase as well; there is no case where the partition epoch 
increases more slowly
-     * than the leader epoch.
+     * Trigger a leader epoch bump if one is needed because of replica 
reassignment.
      *
-     * If the PartitionChangeRecord sets the leader field to something other 
than
-     * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That 
takes care of
-     * case 1. In this function, we check for cases 2 and 3, and handle them 
by manually
-     * setting record.leader to the current leader.
-     *
-     * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica 
manager
-     * that required that the leader epoch be bump whenever the ISR shrank. In 
MV 3.6 this leader
-     * bump is not required when the ISR shrinks. Note, that the leader epoch 
is never increased if
-     * the ISR expanded.
+     * Note that if the leader epoch increases, the partition epoch will 
always increase as well; there is no
+     * case where the partition epoch increases more slowly than the leader 
epoch.
+     */
+    void 
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord 
record) {
+        if (record.leader() != NO_LEADER_CHANGE) {
+            // The leader is already changing, so there will already be a 
leader epoch bump.
+            return;
+        }
+        if (!Replicas.contains(targetReplicas, partition.replicas)) {
+            // If the new replica list does not contain all the brokers that 
the old one did,
+            // ensure that there will be a leader epoch bump by setting the 
leader field.
+            record.setLeader(partition.leader);
+        }
+    }
+
+    /**
+     * Trigger a leader epoch bump if one is needed because of an ISR shrink.
      *
-     * In MV 3.6 and beyond, if the controller is in ZK migration mode, the 
leader epoch must
-     * be bumped during ISR shrink for compatability with ZK brokers.
+     * Note that it's important to call this function only after we have set 
the ISR field in
+     * the PartitionChangeRecord.
      */
-    void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
-        if (record.leader() == NO_LEADER_CHANGE) {
-            boolean bumpLeaderEpochOnIsrShrink = 
metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
-
-            if (!Replicas.contains(targetReplicas, partition.replicas)) {
-                // Reassignment
-                record.setLeader(partition.leader);
-            } else if (bumpLeaderEpochOnIsrShrink && 
!Replicas.contains(targetIsr, partition.isr)) {
-                // ISR shrink
-                record.setLeader(partition.leader);
-            }
+    void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord 
record) {
+        if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || 
zkMigrationEnabled)) {
+            // We only need to bump the leader epoch on an ISR shrink in two 
cases:
+            //
+            // 1. In older metadata versions before 3.6, there was a bug 
(KAFKA-15021) in the
+            //    broker replica manager that required that the leader epoch 
be bumped whenever
+            //    the ISR shrank. (This was never necessary for EXPANSIONS, 
only SHRINKS.)
+            //
+            // 2. During ZK migration, we bump the leader epoch during all ISR 
shrinks, in order
+            // to maintain compatibility with migrating brokers that are still 
in ZK mode.
+            //
+            // If we're not in either case, we can exit here.
+            return;
+        }
+        if (record.leader() != NO_LEADER_CHANGE) {
+            // The leader is already changing, so there will already be a 
leader epoch bump.
+            return;
+        }
+        if (record.isr() == null) {
+            // The ISR is not changing.
+            return;
+        }
+        if (!Replicas.contains(record.isr(), partition.isr)) {
+            // If the new ISR list does not contain all the brokers that the 
old one did,
+            // ensure that there will be a leader epoch bump by setting the 
leader field.
+            record.setLeader(partition.leader);
         }
     }
 
-    private void completeReassignmentIfNeeded() {
+    /**
+     * @return true if the reassignment was completed; false otherwise.
+     */
+    private boolean completeReassignmentIfNeeded() {

Review Comment:
   This change doesn't seem to be necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to