This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 04a77120c9c MINOR: Move the ELR enabled check to the
PartitionChangeBuilder constructor (#20716)
04a77120c9c is described below
commit 04a77120c9cdc5e752363825df296230da203900
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 12 14:41:46 2026 -0800
MINOR: Move the ELR enabled check to the PartitionChangeBuilder constructor
(#20716)
Whether the ELR is enabled is critical to build the partition change.
Now make it a part of the change builder constructor to make sure we
don't forget to set it.
Reviewers: Artem Livshits <[email protected]>, Jun Rao
<[email protected]>
---
.../kafka/controller/PartitionChangeBuilder.java | 10 +--
.../controller/ReplicationControlManager.java | 29 ++++-----
.../controller/PartitionChangeBuilderTest.java | 73 ++++++++++------------
3 files changed, 50 insertions(+), 62 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index ebeab43b4f7..81a45038cd5 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -109,14 +109,15 @@ public class PartitionChangeBuilder {
int partitionId,
IntPredicate isAcceptableLeader,
MetadataVersion metadataVersion,
- int minISR
+ int minISR,
+ boolean eligibleLeaderReplicasEnabled
) {
this.partition = partition;
this.topicId = topicId;
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
- this.eligibleLeaderReplicasEnabled = false;
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.minISR = minISR;
this.targetIsr = Replicas.toList(partition.isr);
@@ -176,11 +177,6 @@ public class PartitionChangeBuilder {
return this;
}
- public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean
eligibleLeaderReplicasEnabled) {
- this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
- return this;
- }
-
public PartitionChangeBuilder
setUseLastKnownLeaderInBalancedRecovery(boolean
useLastKnownLeaderInBalancedRecovery) {
this.useLastKnownLeaderInBalancedRecovery =
useLastKnownLeaderInBalancedRecovery;
return this;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 63c3bb78765..316213bbc04 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1133,9 +1133,9 @@ public class ReplicationControlManager {
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topic.name)
- )
-
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
+ getTopicEffectiveMinIsr(topic.name),
+ featureControl.isElrFeatureEnabled()
+ );
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1619,10 +1619,10 @@ public class ReplicationControlManager {
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topic)
+ getTopicEffectiveMinIsr(topic),
+ featureControl.isElrFeatureEnabled()
)
.setElection(election)
-
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build();
if (record.isEmpty()) {
@@ -1783,10 +1783,10 @@ public class ReplicationControlManager {
topicPartition.partitionId(),
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topic.name)
+ getTopicEffectiveMinIsr(topic.name),
+ featureControl.isElrFeatureEnabled()
)
.setElection(PartitionChangeBuilder.Election.PREFERRED)
-
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build().ifPresent(records::add);
}
@@ -2061,9 +2061,9 @@ public class ReplicationControlManager {
topicIdPart.partitionId(),
new LeaderAcceptor(clusterControl, partition,
isAcceptableLeader),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topic.name)
+ getTopicEffectiveMinIsr(topic.name),
+ featureControl.isElrFeatureEnabled()
);
-
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -2181,9 +2181,9 @@ public class ReplicationControlManager {
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topicName)
+ getTopicEffectiveMinIsr(topicName),
+ featureControl.isElrFeatureEnabled()
);
-
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -2247,9 +2247,9 @@ public class ReplicationControlManager {
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
+ getTopicEffectiveMinIsr(topics.get(tp.topicId()).name),
+ featureControl.isElrFeatureEnabled()
);
-
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
@@ -2330,7 +2330,8 @@ public class ReplicationControlManager {
partitionIndex,
new LeaderAcceptor(clusterControl,
partitionRegistration),
featureControl.metadataVersionOrThrow(),
- getTopicEffectiveMinIsr(topicName)
+ getTopicEffectiveMinIsr(topicName),
+ featureControl.isElrFeatureEnabled()
)
.setDirectory(brokerId, dirId)
.setDefaultDirProvider(clusterDescriber)
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index e901079ec35..0902e71e11b 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -133,8 +133,8 @@ public class PartitionChangeBuilderTest {
0,
r -> r != 3,
metadataVersion,
- 2).
-
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+ 2,
+ metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
@@ -171,8 +171,8 @@ public class PartitionChangeBuilderTest {
0,
r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version),
- 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+ 2,
+ isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
@@ -198,8 +198,8 @@ public class PartitionChangeBuilderTest {
0,
__ -> true,
metadataVersionForPartitionChangeRecordVersion(version),
- 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+ 2,
+ isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
@@ -240,13 +240,11 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
if (metadataVersion.isElrSupported()) {
return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0,
r -> r == 1,
- metadataVersion, 2).
- setEligibleLeaderReplicasEnabled(true).
+ metadataVersion, 2, true).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
} else {
return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID,
0, r -> r == 1,
- metadataVersion, 2).
- setEligibleLeaderReplicasEnabled(false).
+ metadataVersion, 2, false).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
}
@@ -385,8 +383,8 @@ public class PartitionChangeBuilderTest {
0,
r -> true,
metadataVersion,
- 2).
- setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+ 2,
+ metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
setTargetReplicas(List.of());
PartitionChangeRecord record = new PartitionChangeRecord();
@@ -622,7 +620,8 @@ public class PartitionChangeBuilderTest {
0,
brokerId -> false,
metadataVersion,
- 2
+ 2,
+ metadataVersion.isElrSupported()
);
// Set the target ISR to empty to indicate that the last leader is
offline
offlineBuilder.setTargetIsrWithBrokerStates(List.of());
@@ -648,7 +647,8 @@ public class PartitionChangeBuilderTest {
0,
brokerId -> true,
metadataVersion,
- 2
+ 2,
+ metadataVersion.isElrSupported()
);
// The only broker in the ISR is elected leader and stays in the
recovering
@@ -688,7 +688,8 @@ public class PartitionChangeBuilderTest {
0,
brokerId -> brokerId == leaderId,
metadataVersion,
- 2
+ 2,
+ metadataVersion.isElrSupported()
).setElection(Election.UNCLEAN);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord)
onlineBuilder
@@ -754,7 +755,8 @@ public class PartitionChangeBuilderTest {
0,
isValidLeader,
MetadataVersion.MINIMUM_VERSION,
- 2
+ 2,
+ MetadataVersion.MINIMUM_VERSION.isElrSupported()
);
// Before we build the new PartitionChangeBuilder, confirm the current
leader is 0.
@@ -792,9 +794,8 @@ public class PartitionChangeBuilderTest {
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
@@ -845,9 +846,8 @@ public class PartitionChangeBuilderTest {
// No replica is acceptable as leader, so election yields NO_LEADER.
// We intentionally do not change target ISR so record.isr remains
null.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> false,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);
@@ -886,9 +886,8 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
@@ -932,9 +931,8 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
@@ -984,9 +982,8 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
@@ -1031,7 +1028,7 @@ public class PartitionChangeBuilderTest {
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new
PartitionChangeBuilder(registration, FOO_ID,
- 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
+ 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2,
MetadataVersion.IBP_3_7_IV2.isElrSupported()).
setTargetReplicas(List.of(3, 1, 5, 4)).
setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
@@ -1069,7 +1066,7 @@ public class PartitionChangeBuilderTest {
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new
PartitionChangeBuilder(registration, FOO_ID,
- 0, r -> true, MetadataVersion.latestTesting(), 2).
+ 0, r -> true, MetadataVersion.latestTesting(), 2,
MetadataVersion.latestTesting().isElrSupported()).
setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
setDirectory(1, DirectoryId.LOST).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
@@ -1107,9 +1104,8 @@ public class PartitionChangeBuilderTest {
// Make replica 1 offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 1,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
isElrEnabled(version))
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
@@ -1156,9 +1152,8 @@ public class PartitionChangeBuilderTest {
// Mark all the replicas offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> false,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
true)
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
@@ -1183,9 +1178,8 @@ public class PartitionChangeBuilderTest {
if (lastKnownLeaderEnabled) {
assertArrayEquals(new int[]{1}, partition.lastKnownElr,
partition.toString());
builder = new PartitionChangeBuilder(partition, topicId, 0, r ->
false,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version),
3, true)
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(true)
.setUncleanShutdownReplicas(List.of(2))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
@@ -1214,11 +1208,10 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> true,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
true)
.setElection(Election.PREFERRED)
.setUseLastKnownLeaderInBalancedRecovery(true)
- .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
- .setEligibleLeaderReplicasEnabled(true);
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
builder.setTargetIsr(List.of());
@@ -1261,9 +1254,8 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
true)
.setElection(Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);
@@ -1296,9 +1288,8 @@ public class PartitionChangeBuilderTest {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> false,
- metadataVersionForPartitionChangeRecordVersion(version), 3)
+ metadataVersionForPartitionChangeRecordVersion(version), 3,
true)
.setElection(type)
- .setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);