This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04a77120c9c MINOR: Move the ELR enabled check to the 
PartitionChangeBuilder constructor (#20716)
04a77120c9c is described below

commit 04a77120c9cdc5e752363825df296230da203900
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 12 14:41:46 2026 -0800

    MINOR: Move the ELR enabled check to the PartitionChangeBuilder constructor 
(#20716)
    
    Whether the ELR is enabled is critical to build the partition change.
    Now make it a part of the change builder constructor to make sure we
    don't forget to set it.
    
    Reviewers: Artem Livshits <[email protected]>, Jun Rao 
<[email protected]>
---
 .../kafka/controller/PartitionChangeBuilder.java   | 10 +--
 .../controller/ReplicationControlManager.java      | 29 ++++-----
 .../controller/PartitionChangeBuilderTest.java     | 73 ++++++++++------------
 3 files changed, 50 insertions(+), 62 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index ebeab43b4f7..81a45038cd5 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -109,14 +109,15 @@ public class PartitionChangeBuilder {
         int partitionId,
         IntPredicate isAcceptableLeader,
         MetadataVersion metadataVersion,
-        int minISR
+        int minISR,
+        boolean eligibleLeaderReplicasEnabled
     ) {
         this.partition = partition;
         this.topicId = topicId;
         this.partitionId = partitionId;
         this.isAcceptableLeader = isAcceptableLeader;
         this.metadataVersion = metadataVersion;
-        this.eligibleLeaderReplicasEnabled = false;
+        this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
         this.minISR = minISR;
 
         this.targetIsr = Replicas.toList(partition.isr);
@@ -176,11 +177,6 @@ public class PartitionChangeBuilder {
         return this;
     }
 
-    public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean 
eligibleLeaderReplicasEnabled) {
-        this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
-        return this;
-    }
-
     public PartitionChangeBuilder 
setUseLastKnownLeaderInBalancedRecovery(boolean 
useLastKnownLeaderInBalancedRecovery) {
         this.useLastKnownLeaderInBalancedRecovery = 
useLastKnownLeaderInBalancedRecovery;
         return this;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 63c3bb78765..316213bbc04 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1133,9 +1133,9 @@ public class ReplicationControlManager {
                     partitionId,
                     new LeaderAcceptor(clusterControl, partition),
                     featureControl.metadataVersionOrThrow(),
-                    getTopicEffectiveMinIsr(topic.name)
-                )
-                    
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
+                    getTopicEffectiveMinIsr(topic.name),
+                    featureControl.isElrFeatureEnabled()
+                );
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1619,10 +1619,10 @@ public class ReplicationControlManager {
             partitionId,
             new LeaderAcceptor(clusterControl, partition),
             featureControl.metadataVersionOrThrow(),
-            getTopicEffectiveMinIsr(topic)
+            getTopicEffectiveMinIsr(topic),
+            featureControl.isElrFeatureEnabled()
         )
             .setElection(election)
-            
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
             .setDefaultDirProvider(clusterDescriber)
             .build();
         if (record.isEmpty()) {
@@ -1783,10 +1783,10 @@ public class ReplicationControlManager {
                 topicPartition.partitionId(),
                 new LeaderAcceptor(clusterControl, partition),
                 featureControl.metadataVersionOrThrow(),
-                getTopicEffectiveMinIsr(topic.name)
+                getTopicEffectiveMinIsr(topic.name),
+                featureControl.isElrFeatureEnabled()
             )
                 .setElection(PartitionChangeBuilder.Election.PREFERRED)
-                
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
                 .setDefaultDirProvider(clusterDescriber)
                 .build().ifPresent(records::add);
         }
@@ -2061,9 +2061,9 @@ public class ReplicationControlManager {
                 topicIdPart.partitionId(),
                 new LeaderAcceptor(clusterControl, partition, 
isAcceptableLeader),
                 featureControl.metadataVersionOrThrow(),
-                getTopicEffectiveMinIsr(topic.name)
+                getTopicEffectiveMinIsr(topic.name),
+                featureControl.isElrFeatureEnabled()
             );
-            
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -2181,9 +2181,9 @@ public class ReplicationControlManager {
             tp.partitionId(),
             new LeaderAcceptor(clusterControl, part),
             featureControl.metadataVersionOrThrow(),
-            getTopicEffectiveMinIsr(topicName)
+            getTopicEffectiveMinIsr(topicName),
+            featureControl.isElrFeatureEnabled()
         );
-        
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
         if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -2247,9 +2247,9 @@ public class ReplicationControlManager {
             tp.partitionId(),
             new LeaderAcceptor(clusterControl, part),
             featureControl.metadataVersionOrThrow(),
-            getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
+            getTopicEffectiveMinIsr(topics.get(tp.topicId()).name),
+            featureControl.isElrFeatureEnabled()
         );
-        
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
         if (!reassignment.replicas().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.replicas());
         }
@@ -2330,7 +2330,8 @@ public class ReplicationControlManager {
                                     partitionIndex,
                                     new LeaderAcceptor(clusterControl, 
partitionRegistration),
                                     featureControl.metadataVersionOrThrow(),
-                                    getTopicEffectiveMinIsr(topicName)
+                                    getTopicEffectiveMinIsr(topicName),
+                                    featureControl.isElrFeatureEnabled()
                             )
                                     .setDirectory(brokerId, dirId)
                                     .setDefaultDirProvider(clusterDescriber)
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index e901079ec35..0902e71e11b 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -133,8 +133,8 @@ public class PartitionChangeBuilderTest {
                 0,
                 r -> r != 3,
                 metadataVersion,
-                2).
-                
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+                2,
+                metadataVersion.isElrSupported()).
                 setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
@@ -171,8 +171,8 @@ public class PartitionChangeBuilderTest {
                 0,
                 r -> r != 3,
                 metadataVersionForPartitionChangeRecordVersion(version),
-                2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+                2,
+                isElrEnabled(version)).
                 setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
@@ -198,8 +198,8 @@ public class PartitionChangeBuilderTest {
                 0,
                 __ -> true,
                 metadataVersionForPartitionChangeRecordVersion(version),
-                2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+                2,
+                isElrEnabled(version)).
                 setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
@@ -240,13 +240,11 @@ public class PartitionChangeBuilderTest {
             
metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
         if (metadataVersion.isElrSupported()) {
             return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, 
r -> r == 1,
-                    metadataVersion, 2).
-                     setEligibleLeaderReplicasEnabled(true).
+                    metadataVersion, 2, true).
                      setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
         } else {
             return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 
0, r -> r == 1,
-                    metadataVersion, 2).
-                     setEligibleLeaderReplicasEnabled(false).
+                    metadataVersion, 2, false).
                      setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
         }
     }
@@ -385,8 +383,8 @@ public class PartitionChangeBuilderTest {
             0,
             r -> true,
             metadataVersion,
-            2).
-            setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+            2,
+            metadataVersion.isElrSupported()).
             setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
             setTargetReplicas(List.of());
         PartitionChangeRecord record = new PartitionChangeRecord();
@@ -622,7 +620,8 @@ public class PartitionChangeBuilderTest {
             0,
             brokerId -> false,
             metadataVersion,
-            2
+            2,
+            metadataVersion.isElrSupported()
         );
         // Set the target ISR to empty to indicate that the last leader is 
offline
         offlineBuilder.setTargetIsrWithBrokerStates(List.of());
@@ -648,7 +647,8 @@ public class PartitionChangeBuilderTest {
             0,
             brokerId -> true,
             metadataVersion,
-            2
+            2,
+            metadataVersion.isElrSupported()
         );
 
         // The only broker in the ISR is elected leader and stays in the 
recovering
@@ -688,7 +688,8 @@ public class PartitionChangeBuilderTest {
             0,
             brokerId -> brokerId == leaderId,
             metadataVersion,
-            2
+            2,
+            metadataVersion.isElrSupported()
         ).setElection(Election.UNCLEAN);
         // The partition should stay as recovering
         PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
onlineBuilder
@@ -754,7 +755,8 @@ public class PartitionChangeBuilderTest {
             0,
             isValidLeader,
             MetadataVersion.MINIMUM_VERSION,
-            2
+            2,
+            MetadataVersion.MINIMUM_VERSION.isElrSupported()
         );
 
         // Before we build the new PartitionChangeBuilder, confirm the current 
leader is 0.
@@ -792,9 +794,8 @@ public class PartitionChangeBuilderTest {
             .build();
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
@@ -845,9 +846,8 @@ public class PartitionChangeBuilderTest {
         // No replica is acceptable as leader, so election yields NO_LEADER.
         // We intentionally do not change target ISR so record.isr remains 
null.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> false,
-            metadataVersionForPartitionChangeRecordVersion(version), 3)
+            metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(true);
 
@@ -886,9 +886,8 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
         // Min ISR is 3.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
@@ -932,9 +931,8 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
         // Min ISR is 3.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
@@ -984,9 +982,8 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
         // Min ISR is 3.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
@@ -1031,7 +1028,7 @@ public class PartitionChangeBuilderTest {
                 setPartitionEpoch(200).
                 build();
         Optional<ApiMessageAndVersion> built = new 
PartitionChangeBuilder(registration, FOO_ID,
-                0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
+                0, r -> true, MetadataVersion.IBP_3_7_IV2, 2, 
MetadataVersion.IBP_3_7_IV2.isElrSupported()).
                 setTargetReplicas(List.of(3, 1, 5, 4)).
                 setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
                 setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
@@ -1069,7 +1066,7 @@ public class PartitionChangeBuilderTest {
                 setPartitionEpoch(200).
                 build();
         Optional<ApiMessageAndVersion> built = new 
PartitionChangeBuilder(registration, FOO_ID,
-                0, r -> true, MetadataVersion.latestTesting(), 2).
+                0, r -> true, MetadataVersion.latestTesting(), 2, 
MetadataVersion.latestTesting().isElrSupported()).
                 setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
                 setDirectory(1, DirectoryId.LOST).
                 setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
@@ -1107,9 +1104,8 @@ public class PartitionChangeBuilderTest {
 
         // Make replica 1 offline.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 1,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
isElrEnabled(version))
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
             .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
 
@@ -1156,9 +1152,8 @@ public class PartitionChangeBuilderTest {
 
         // Mark all the replicas offline.
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> false,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
true)
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(true)
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
 
@@ -1183,9 +1178,8 @@ public class PartitionChangeBuilderTest {
         if (lastKnownLeaderEnabled) {
             assertArrayEquals(new int[]{1}, partition.lastKnownElr, 
partition.toString());
             builder = new PartitionChangeBuilder(partition, topicId, 0, r -> 
false,
-                    metadataVersionForPartitionChangeRecordVersion(version), 3)
+                    metadataVersionForPartitionChangeRecordVersion(version), 
3, true)
                 .setElection(Election.PREFERRED)
-                .setEligibleLeaderReplicasEnabled(true)
                 .setUncleanShutdownReplicas(List.of(2))
                 .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
                 
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
@@ -1214,11 +1208,10 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> true,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
true)
             .setElection(Election.PREFERRED)
             .setUseLastKnownLeaderInBalancedRecovery(true)
-            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
-            .setEligibleLeaderReplicasEnabled(true);
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
 
         builder.setTargetIsr(List.of());
 
@@ -1261,9 +1254,8 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
true)
             .setElection(Election.PREFERRED)
-            .setEligibleLeaderReplicasEnabled(true)
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(true);
 
@@ -1296,9 +1288,8 @@ public class PartitionChangeBuilderTest {
         Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> false,
-                metadataVersionForPartitionChangeRecordVersion(version), 3)
+                metadataVersionForPartitionChangeRecordVersion(version), 3, 
true)
             .setElection(type)
-            .setEligibleLeaderReplicasEnabled(true)
             .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(true);
 

Reply via email to