This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0457f7360e KAFKA-15109 Don't skip leader epoch bump while in 
migration mode (#13890)
d0457f7360e is described below

commit d0457f7360e2e3d34800fbd41de429afbaabd101
Author: David Arthur <[email protected]>
AuthorDate: Wed Jun 21 13:09:05 2023 -0400

    KAFKA-15109 Don't skip leader epoch bump while in migration mode (#13890)
    
    While in migration mode, the KRaft controller must always bump the leader 
epoch when shrinking an ISR.
    This is required to maintain compatibility with the ZK brokers. Without the 
epoch bump, the ZK brokers
    will ignore the partition state change present in the LeaderAndIsrRequest 
since it would not contain a new
    leader epoch.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/controller/PartitionChangeBuilder.java   | 15 +++++++-
 .../controller/ReplicationControlManager.java      |  9 ++++-
 .../controller/PartitionChangeBuilderTest.java     | 44 +++++++++++++++++++---
 3 files changed, 58 insertions(+), 10 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 1c110e567a5..78c1c2363d7 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -81,6 +81,8 @@ public class PartitionChangeBuilder {
     private List<Integer> targetAdding;
     private Election election = Election.ONLINE;
     private LeaderRecoveryState targetLeaderRecoveryState;
+    private boolean bumpLeaderEpochOnIsrShrink;
+
 
     public PartitionChangeBuilder(
         PartitionRegistration partition,
@@ -94,6 +96,8 @@ public class PartitionChangeBuilder {
         this.partitionId = partitionId;
         this.isAcceptableLeader = isAcceptableLeader;
         this.metadataVersion = metadataVersion;
+        this.bumpLeaderEpochOnIsrShrink = 
!metadataVersion.isSkipLeaderEpochBumpSupported();
+
         this.targetIsr = Replicas.toList(partition.isr);
         this.targetReplicas = Replicas.toList(partition.replicas);
         this.targetRemoving = Replicas.toList(partition.removingReplicas);
@@ -140,6 +144,11 @@ public class PartitionChangeBuilder {
         return this;
     }
 
+    public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
+        this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+        return this;
+    }
+
     // VisibleForTesting
     static class ElectionResult {
         final int node;
@@ -273,13 +282,15 @@ public class PartitionChangeBuilder {
      * that required that the leader epoch be bump whenever the ISR shrank. In 
MV 3.6 this leader
      * bump is not required when the ISR shrinks. Note, that the leader epoch 
is never increased if
      * the ISR expanded.
+     *
+     * In MV 3.6 and beyond, if the controller is in ZK migration mode, the 
leader epoch must
+     * be bumped during ISR shrink for compatability with ZK brokers.
      */
     void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
         if (record.leader() == NO_LEADER_CHANGE) {
             if (!Replicas.contains(targetReplicas, partition.replicas)) {
                 record.setLeader(partition.leader);
-            } else if (!metadataVersion.isSkipLeaderEpochBumpSupported() &&
-                       !Replicas.contains(targetIsr, partition.isr)) {
+            } else if (bumpLeaderEpochOnIsrShrink && 
!Replicas.contains(targetIsr, partition.isr)) {
                 record.setLeader(partition.leader);
             }
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index f7d5e69ed8b..7af6ab8b317 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -995,6 +995,7 @@ public class ReplicationControlManager {
                     clusterControl::isActive,
                     featureControl.metadataVersion()
                 );
+                
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1382,7 +1383,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
-        builder.setElection(election);
+        
builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
         Optional<ApiMessageAndVersion> record = builder.build();
         if (!record.isPresent()) {
             if (electionType == ElectionType.PREFERRED) {
@@ -1517,7 +1518,8 @@ public class ReplicationControlManager {
                 clusterControl::isActive,
                 featureControl.metadataVersion()
             );
-            builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
+            builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
+                
.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
             builder.build().ifPresent(records::add);
         }
 
@@ -1738,6 +1740,7 @@ public class ReplicationControlManager {
                 isAcceptableLeader,
                 featureControl.metadataVersion()
             );
+            
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -1850,6 +1853,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
+        
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
         if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -1907,6 +1911,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
+        
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
         if (!reassignment.replicas().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.replicas());
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 3374f12d7ac..c707642bebd 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -31,12 +31,14 @@ import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.function.IntPredicate;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
 import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
@@ -46,6 +48,7 @@ import static 
org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 
 @Timeout(value = 40)
@@ -200,6 +203,14 @@ public class PartitionChangeBuilderTest {
             new PartitionChangeRecord(),
             NO_LEADER_CHANGE
         );
+        testTriggerLeaderEpochBumpIfNeededLeader(
+            createFooBuilder()
+                .setTargetIsrWithBrokerStates(
+                    
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 
3, 4)))
+                .setBumpLeaderEpochOnIsrShrink(true),
+            new PartitionChangeRecord(),
+            NO_LEADER_CHANGE
+        );
         testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
             setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(),
             NO_LEADER_CHANGE);
@@ -217,6 +228,16 @@ public class PartitionChangeBuilderTest {
             new PartitionChangeRecord(),
             1
         );
+
+        // KAFKA-15109: Shrinking the ISR while in ZK migration mode does 
increase the leader epoch
+        testTriggerLeaderEpochBumpIfNeededLeader(
+            createFooBuilder()
+                .setTargetIsrWithBrokerStates(
+                    
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+                .setBumpLeaderEpochOnIsrShrink(true),
+            new PartitionChangeRecord(),
+            1
+        );
     }
 
     @Test
@@ -382,9 +403,18 @@ public class PartitionChangeBuilderTest {
         );
     }
 
+    private static Stream<Arguments> leaderRecoveryAndZkMigrationParams() {
+        return Stream.of(
+                arguments(true, true),
+                arguments(true, false),
+                arguments(false, true),
+                arguments(false, false)
+        );
+    }
+
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean 
isLeaderRecoverySupported) {
+    @MethodSource("leaderRecoveryAndZkMigrationParams")
+    public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean 
isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
         final byte noChange = (byte) -1;
         int leaderId = 1;
         LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING;
@@ -407,6 +437,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> false,
             metadataVersion
         );
+        offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
         // Set the target ISR to empty to indicate that the last leader is 
offline
         offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
 
@@ -432,6 +463,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> true,
             metadataVersion
         );
+        onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
 
         // The only broker in the ISR is elected leader and stays in the 
recovering
         changeRecord = (PartitionChangeRecord) 
onlineBuilder.build().get().message();
@@ -445,8 +477,8 @@ public class PartitionChangeBuilderTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testUncleanSetsLeaderRecoveringState(boolean 
isLeaderRecoverySupported) {
+    @MethodSource("leaderRecoveryAndZkMigrationParams")
+    void testUncleanSetsLeaderRecoveringState(boolean 
isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
         final byte noChange = (byte) -1;
         int leaderId = 1;
         PartitionRegistration registration = new 
PartitionRegistration.Builder().
@@ -468,7 +500,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> brokerId == leaderId,
             metadataVersion
         ).setElection(Election.UNCLEAN);
-
+        onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
         // The partition should stay as recovering
         PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
onlineBuilder
             .build()

Reply via email to