This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc7d912e8bb KAFKA-15109 Ensure the leader epoch bump occurs for older 
MetadataVersions (#13910)
fc7d912e8bb is described below

commit fc7d912e8bb856500fa27b7455e6eff098c08196
Author: David Arthur <[email protected]>
AuthorDate: Tue Jun 27 11:49:20 2023 -0400

    KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions 
(#13910)
    
    This fixes a regression introduced by the previous KAFKA-15109 commit 
(d0457f7360 on trunk).
    
    Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 .../kafka/controller/PartitionChangeBuilder.java   | 12 ++++--
 .../controller/ReplicationControlManager.java      | 12 +++---
 .../controller/PartitionChangeBuilderTest.java     | 50 +++++++++++++++++++---
 .../kafka/server/common/MetadataVersion.java       |  4 +-
 4 files changed, 59 insertions(+), 19 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 78c1c2363d7..78d22967e2a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -81,7 +81,7 @@ public class PartitionChangeBuilder {
     private List<Integer> targetAdding;
     private Election election = Election.ONLINE;
     private LeaderRecoveryState targetLeaderRecoveryState;
-    private boolean bumpLeaderEpochOnIsrShrink;
+    private boolean zkMigrationEnabled;
 
 
     public PartitionChangeBuilder(
@@ -96,7 +96,7 @@ public class PartitionChangeBuilder {
         this.partitionId = partitionId;
         this.isAcceptableLeader = isAcceptableLeader;
         this.metadataVersion = metadataVersion;
-        this.bumpLeaderEpochOnIsrShrink = 
!metadataVersion.isSkipLeaderEpochBumpSupported();
+        this.zkMigrationEnabled = false;
 
         this.targetIsr = Replicas.toList(partition.isr);
         this.targetReplicas = Replicas.toList(partition.replicas);
@@ -144,8 +144,8 @@ public class PartitionChangeBuilder {
         return this;
     }
 
-    public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
-        this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+    public PartitionChangeBuilder setZkMigrationEnabled(boolean 
zkMigrationEnabled) {
+        this.zkMigrationEnabled = zkMigrationEnabled;
         return this;
     }
 
@@ -288,9 +288,13 @@ public class PartitionChangeBuilder {
      */
     void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
         if (record.leader() == NO_LEADER_CHANGE) {
+            boolean bumpLeaderEpochOnIsrShrink = 
metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
+
             if (!Replicas.contains(targetReplicas, partition.replicas)) {
+                // Reassignment
                 record.setLeader(partition.leader);
             } else if (bumpLeaderEpochOnIsrShrink && 
!Replicas.contains(targetIsr, partition.isr)) {
+                // ISR shrink
                 record.setLeader(partition.leader);
             }
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 7af6ab8b317..f8631e4560b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -995,7 +995,7 @@ public class ReplicationControlManager {
                     clusterControl::isActive,
                     featureControl.metadataVersion()
                 );
-                
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+                
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1383,7 +1383,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
-        
builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+        
builder.setElection(election).setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
         Optional<ApiMessageAndVersion> record = builder.build();
         if (!record.isPresent()) {
             if (electionType == ElectionType.PREFERRED) {
@@ -1519,7 +1519,7 @@ public class ReplicationControlManager {
                 featureControl.metadataVersion()
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
-                
.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+                .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
             builder.build().ifPresent(records::add);
         }
 
@@ -1740,7 +1740,7 @@ public class ReplicationControlManager {
                 isAcceptableLeader,
                 featureControl.metadataVersion()
             );
-            
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+            
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -1853,7 +1853,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
-        
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+        builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
         if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -1911,7 +1911,7 @@ public class ReplicationControlManager {
             clusterControl::isActive,
             featureControl.metadataVersion()
         );
-        
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+        builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
         if (!reassignment.replicas().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.replicas());
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index c707642bebd..c2b07956d86 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -93,7 +93,11 @@ public class PartitionChangeBuilderTest {
     private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
     private static PartitionChangeBuilder createFooBuilder() {
-        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, 
MetadataVersion.latest());
+        return createFooBuilder(MetadataVersion.latest());
+    }
+
+    private static PartitionChangeBuilder createFooBuilder(MetadataVersion 
metadataVersion) {
+        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, 
metadataVersion);
     }
 
     private static final PartitionRegistration BAR = new 
PartitionRegistration.Builder().
@@ -203,11 +207,12 @@ public class PartitionChangeBuilderTest {
             new PartitionChangeRecord(),
             NO_LEADER_CHANGE
         );
+        // Expanding the ISR during migration doesn't increase leader epoch
         testTriggerLeaderEpochBumpIfNeededLeader(
             createFooBuilder()
                 .setTargetIsrWithBrokerStates(
                     
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 
3, 4)))
-                .setBumpLeaderEpochOnIsrShrink(true),
+                .setZkMigrationEnabled(true),
             new PartitionChangeRecord(),
             NO_LEADER_CHANGE
         );
@@ -228,13 +233,44 @@ public class PartitionChangeBuilderTest {
             new PartitionChangeRecord(),
             1
         );
+    }
 
-        // KAFKA-15109: Shrinking the ISR while in ZK migration mode does 
increase the leader epoch
+    @Test
+    public void testLeaderEpochBumpZkMigration() {
+        // KAFKA-15109: Shrinking the ISR while in ZK migration mode requires 
a leader epoch bump
         testTriggerLeaderEpochBumpIfNeededLeader(
             createFooBuilder()
                 .setTargetIsrWithBrokerStates(
                     
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
-                .setBumpLeaderEpochOnIsrShrink(true),
+                .setZkMigrationEnabled(true),
+            new PartitionChangeRecord(),
+            1
+        );
+
+        testTriggerLeaderEpochBumpIfNeededLeader(
+            createFooBuilder()
+                .setTargetIsrWithBrokerStates(
+                    
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+                .setZkMigrationEnabled(false),
+            new PartitionChangeRecord(),
+            NO_LEADER_CHANGE
+        );
+
+        // For older MV, always expect the epoch to increase regardless of ZK 
migration
+        testTriggerLeaderEpochBumpIfNeededLeader(
+            createFooBuilder(MetadataVersion.IBP_3_5_IV2)
+                .setTargetIsrWithBrokerStates(
+                    
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+                .setZkMigrationEnabled(true),
+            new PartitionChangeRecord(),
+            1
+        );
+
+        testTriggerLeaderEpochBumpIfNeededLeader(
+            createFooBuilder(MetadataVersion.IBP_3_5_IV2)
+                .setTargetIsrWithBrokerStates(
+                    
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+                .setZkMigrationEnabled(false),
             new PartitionChangeRecord(),
             1
         );
@@ -437,7 +473,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> false,
             metadataVersion
         );
-        offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+        offlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
         // Set the target ISR to empty to indicate that the last leader is 
offline
         offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
 
@@ -463,7 +499,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> true,
             metadataVersion
         );
-        onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+        onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
 
         // The only broker in the ISR is elected leader and stays in the 
recovering
         changeRecord = (PartitionChangeRecord) 
onlineBuilder.build().get().message();
@@ -500,7 +536,7 @@ public class PartitionChangeBuilderTest {
             brokerId -> brokerId == leaderId,
             metadataVersion
         ).setElection(Election.UNCLEAN);
-        onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+        onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
         // The partition should stay as recovering
         PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
onlineBuilder
             .build()
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 2637aef7016..0ccfcc06be1 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -263,8 +263,8 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_5_IV2);
     }
 
-    public boolean isSkipLeaderEpochBumpSupported() {
-        return this.isAtLeast(IBP_3_6_IV0);
+    public boolean isLeaderEpochBumpRequiredOnIsrShrink() {
+        return !this.isAtLeast(IBP_3_6_IV0);
     }
 
     public boolean isKRaftSupported() {

Reply via email to