This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 4369d8e07e2 MINOR: simplify last known elr update (#20655)
4369d8e07e2 is described below
commit 4369d8e07e2efec307c9ec08ecf14543299f2bd4
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Oct 8 01:25:21 2025 -0700
MINOR: simplify last known elr update (#20655)
Backport #20629 to 4.1 branch
Simplify the last known elr update logic. This way can make a more
robust logic.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/controller/PartitionChangeBuilder.java | 6 ++--
.../controller/PartitionChangeBuilderTest.java | 42 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index a1d35293bd1..52f3721d18f 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -494,12 +494,10 @@ public class PartitionChangeBuilder {
private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
if (!useLastKnownLeaderInBalancedRecovery ||
!eligibleLeaderReplicasEnabled) return;
- if (record.isr() != null && record.isr().isEmpty() &&
(partition.lastKnownElr.length != 1 ||
- partition.lastKnownElr[0] != partition.leader)) {
+ if (record.leader() == NO_LEADER && partition.lastKnownElr.length ==
0) {
// Only update the last known leader when the first time the
partition becomes leaderless.
record.setLastKnownElr(List.of(partition.leader));
- } else if ((record.leader() >= 0 || (partition.leader != NO_LEADER &&
record.leader() != NO_LEADER))
- && partition.lastKnownElr.length > 0) {
+ } else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) {
// Clear the LastKnownElr field if the partition will have or
continues to have a valid leader.
record.setLastKnownElr(List.of());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index f836be9e93f..c040b981ee4 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -826,6 +826,48 @@ public class PartitionChangeBuilderTest {
}
}
+ @Test
+ public void
testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() {
+ PartitionRegistration partition = new PartitionRegistration.Builder()
+ .setReplicas(new int[] {1, 2, 3})
+ .setDirectories(new Uuid[] {
+ DirectoryId.UNASSIGNED,
+ DirectoryId.UNASSIGNED,
+ DirectoryId.UNASSIGNED
+ })
+ .setIsr(new int[] {1})
+ .setElr(new int[] {2})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(100)
+ .setPartitionEpoch(200)
+ .build();
+
+ short version = 2; // ELR supported
+ Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+ // No replica is acceptable as leader, so election yields NO_LEADER.
+ // We intentionally do not change target ISR so record.isr remains
null.
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId, 0, r -> false,
+ metadataVersionForPartitionChangeRecordVersion(version), 3)
+ .setElection(Election.PREFERRED)
+ .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
+ .setUseLastKnownLeaderInBalancedRecovery(true);
+
+ ApiMessageAndVersion change = builder.build().get();
+ PartitionChangeRecord record = (PartitionChangeRecord)
change.message();
+
+ assertEquals(NO_LEADER, record.leader());
+ // There is no ISR update if we do not perform the leader verification
on the ISR members.
+ assertNull(record.isr(), record.toString());
+ assertEquals(1, record.lastKnownElr().size(), record.toString());
+ assertEquals(1, record.lastKnownElr().get(0), record.toString());
+ partition = partition.merge((PartitionChangeRecord)
builder.build().get().message());
+ assertArrayEquals(new int[] {1}, partition.lastKnownElr);
+ }
+
+
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version)
{