This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 4369d8e07e2 MINOR: simplify last known elr update (#20655)
4369d8e07e2 is described below

commit 4369d8e07e2efec307c9ec08ecf14543299f2bd4
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Oct 8 01:25:21 2025 -0700

    MINOR: simplify last known elr update (#20655)
    
    Backport #20629 to 4.1 branch
    
    Simplify the last known elr update logic. This way can make a more
    robust logic.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/controller/PartitionChangeBuilder.java   |  6 ++--
 .../controller/PartitionChangeBuilderTest.java     | 42 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index a1d35293bd1..52f3721d18f 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -494,12 +494,10 @@ public class PartitionChangeBuilder {
 
     private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
         if (!useLastKnownLeaderInBalancedRecovery || 
!eligibleLeaderReplicasEnabled) return;
-        if (record.isr() != null && record.isr().isEmpty() && 
(partition.lastKnownElr.length != 1 ||
-            partition.lastKnownElr[0] != partition.leader)) {
+        if (record.leader() == NO_LEADER && partition.lastKnownElr.length == 
0) {
             // Only update the last known leader when the first time the 
partition becomes leaderless.
             record.setLastKnownElr(List.of(partition.leader));
-        } else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && 
record.leader() != NO_LEADER))
-            && partition.lastKnownElr.length > 0) {
+        } else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) {
             // Clear the LastKnownElr field if the partition will have or 
continues to have a valid leader.
             record.setLastKnownElr(List.of());
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index f836be9e93f..c040b981ee4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -826,6 +826,48 @@ public class PartitionChangeBuilderTest {
         }
     }
 
+    @Test
+    public void 
testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() {
+        PartitionRegistration partition = new PartitionRegistration.Builder()
+            .setReplicas(new int[] {1, 2, 3})
+            .setDirectories(new Uuid[] {
+                DirectoryId.UNASSIGNED,
+                DirectoryId.UNASSIGNED,
+                DirectoryId.UNASSIGNED
+            })
+            .setIsr(new int[] {1})
+            .setElr(new int[] {2})
+            .setLeader(1)
+            .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+            .setLeaderEpoch(100)
+            .setPartitionEpoch(200)
+            .build();
+
+        short version = 2; // ELR supported
+        Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+        // No replica is acceptable as leader, so election yields NO_LEADER.
+        // We intentionally do not change target ISR so record.isr remains 
null.
+        PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, 
topicId, 0, r -> false,
+            metadataVersionForPartitionChangeRecordVersion(version), 3)
+            .setElection(Election.PREFERRED)
+            .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
+            .setUseLastKnownLeaderInBalancedRecovery(true);
+
+        ApiMessageAndVersion change = builder.build().get();
+        PartitionChangeRecord record = (PartitionChangeRecord) 
change.message();
+
+        assertEquals(NO_LEADER, record.leader());
+        // There is no ISR update if we do not perform the leader verification 
on the ISR members.
+        assertNull(record.isr(), record.toString());
+        assertEquals(1, record.lastKnownElr().size(), record.toString());
+        assertEquals(1, record.lastKnownElr().get(0), record.toString());
+        partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
+        assertArrayEquals(new int[] {1}, partition.lastKnownElr);
+    }
+
+
     @ParameterizedTest
     @MethodSource("partitionChangeRecordVersions")
     public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) 
{

Reply via email to