This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d267d45369 KAFKA-19975 Fix blocked reassignment completion due to ISR 
missing replicas (#21114)
3d267d45369 is described below

commit 3d267d45369818c804ed49c56e9ae405e28b234c
Author: Deepak Goyal <[email protected]>
AuthorDate: Fri Jan 16 23:45:52 2026 +0530

    KAFKA-19975 Fix blocked reassignment completion due to ISR missing replicas 
(#21114)
    
    The reassignment completion logic currently requires that all target
    replicas be present in the in-sync replica set (ISR). If an unhealthy
    broker exists in both the current and target replica sets (i.e., it is
    neither added nor removed during the reassignment), it still blocks the
    reassignment from completing.
    
    For example, the following reassignments would fail to complete if
    replica 2 were unhealthy and unable to join the ISR, even though
    completing the reassignment would not reduce the number of replicas in
    the ISR:
    
    - [0, 1, 2] ->  [0, 1, 2, 3]
    - [0, 1, 2] ->  [1, 2, 3]
    - [0, 1, 2] ->  [2, 3, 4]
    
    We should allow reassignments to complete if there are replicas in the
    target replica set that are not in the ISR and are not part of the
    adding set. This is safe because completing the reassignment would not
    decrease the ISR size. However, if the target replica set is smaller
    than the current replica set, we will continue to require that all
    replicas in the target set are in the ISR to prevent the reassignment
    from unintentionally shrinking the ISR.
    
    Reviewers: Alyssa Huang <[email protected]>, Jun Rao <[email protected]>
---
 .../controller/PartitionReassignmentReplicas.java  |  10 +-
 .../PartitionReassignmentReplicasTest.java         | 106 ++++++++++++++++++++-
 2 files changed, 110 insertions(+), 6 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 5f85dd1a3d6..00de30445bb 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -119,7 +119,15 @@ class PartitionReassignmentReplicas {
             }
             if (newTargetReplicas.isEmpty()) return Optional.empty();
         }
-        if (!newTargetIsr.containsAll(newTargetReplicas)) return 
Optional.empty();
+
+        // Wait for all adding sync replicas to join the ISR, as new brokers 
could be unhealthy.
+        if (!newTargetIsr.containsAll(adding)) return Optional.empty();
+
+        // If the replication factor is being reduced, wait for all target 
sync replicas to be present in newTargetIsr to avoid
+        // the potential negative impact on ISR. For example: We reassign 
[0,1,2,3,4] -> [2,3,4,5] when only 0, 1 and 5 are in
+        // ISR. If we complete the reassignment, we end up with ISR of [5]. If 
we stay with the current assignment, ISR is
+        // [0,1]. So, by completing the reassignment in this case, we are 
reducing the size of ISR unnecessarily.
+        if (adding.size() < removing.size() && 
!newTargetIsr.containsAll(newTargetReplicas)) return Optional.empty();
 
         return Optional.of(
             new CompletedReassignment(
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index 33c540b9d14..8d6c716781a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -105,7 +105,7 @@ public class PartitionReassignmentReplicasTest {
     }
 
     @Test
-    public void testDoesCompleteReassignmentAllNewReplicas() {
+    public void testCanCompleteReassignmentAllNewReplicas() {
         PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
             partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(3, 4, 5)));
         assertTrue(replicas.isReassignmentInProgress());
@@ -118,7 +118,7 @@ public class PartitionReassignmentReplicasTest {
     }
 
     @Test
-    public void testDoesCompleteReassignmentSomeNewReplicas() {
+    public void testCanCompleteReassignmentSomeNewReplicas() {
         PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
             partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(0, 1, 3)));
         assertTrue(replicas.isReassignmentInProgress());
@@ -196,13 +196,109 @@ public class PartitionReassignmentReplicasTest {
                 build()));
     }
 
+    // Tests that a reassignment completes when a target replica (that is also 
present
+    // in the replica set prior to the reassignment) is missing from the ISR.
     @Test
-    public void 
testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() {
+    public void 
testCanCompleteReassignmentIfIsrDoesNotHaveAnExistingTargetReplica() {
         PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
-            partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(0, 1, 3)));
+                partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(0, 1, 3)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 1 is not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(0, 2, 3));
+        assertTrue(reassignmentOptional.isPresent());
+        PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = reassignmentOptional.get();
+        assertEquals(List.of(0, 3), completedReassignment.isr());
+        assertEquals(List.of(0, 1, 3), completedReassignment.replicas());
+    }
+
+    // Tests that a reassignment completes when multiple target replicas (that 
are also present
+    // in the replica set prior to the reassignment) are missing from the ISR.
+    @Test
+    public void 
testCanCompleteReassignmentIfIsrDoesNotHaveBothExistingTargetReplica() {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(0, 1, 3)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 0 and 1 are not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(2, 3));
+        assertTrue(reassignmentOptional.isPresent());
+        PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = reassignmentOptional.get();
+        assertEquals(List.of(3), completedReassignment.isr());
+        assertEquals(List.of(0, 1, 3), completedReassignment.replicas());
+    }
+
+    // Tests that a reassignment does not complete when a target adding 
replica is missing from the ISR.
+    @Test
+    public void 
testDoesNotCompleteReassignmentIfIsrDoesNotHaveAnAddingTargetReplica() {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2)), 
partitionAssignment(List.of(0, 1, 3)));
         assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 3 is not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(0, 1, 2));
+        assertFalse(reassignmentOptional.isPresent());
+    }
+
+    // Tests that a reassignment completes when RF is increasing and a target 
replica (that is also
+    // present in the replica set prior to the reassignment) is missing from 
the ISR.
+    @Test
+    public void 
testCanCompleteReassignmentWhenReplicationFactorIncreasesAndMissingAnExistingTargetReplicaFromIsr()
 {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2, 3)), 
partitionAssignment(List.of(1, 2, 3, 4, 5)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 3 is not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(0, 1, 2, 4, 5));
+        assertTrue(reassignmentOptional.isPresent());
+        PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = reassignmentOptional.get();
+        assertEquals(List.of(1, 2, 4, 5), completedReassignment.isr());
+        assertEquals(List.of(1, 2, 3, 4, 5), completedReassignment.replicas());
+    }
+
+    // Tests that a reassignment does not complete when RF is increasing
+    // and a target adding replica is missing from the ISR
+    @Test
+    public void 
testDoesNotCompleteReassignmentWhenReplicationFactorIncreasesAndMissingAnAddingReplicaFromIsr()
 {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2, 3)), 
partitionAssignment(List.of(1, 2, 3, 4, 5)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 4 is not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 5));
+        assertFalse(reassignmentOptional.isPresent());
+    }
+
+    // Tests that a reassignment does not complete when RF is decreasing and a 
target replica (that is also
+    // present in the replica set prior to the reassignment) is missing from 
the ISR.
+    @Test
+    public void 
testDoesNotCompleteReassignmentWhenReplicationFactorDecreasesAndMissingAnExistingTargetReplicaFromIsr()
 {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2, 3, 4)), 
partitionAssignment(List.of(2, 3, 4, 5)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 4 is not in sync
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
+                replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 5));
+        assertFalse(reassignmentOptional.isPresent());
+    }
+
+    // Tests that a reassignment does not complete when RF is decreasing
+    // and a target adding replica is missing from the ISR.
+    @Test
+    public void 
testDoesNotCompleteReassignmentWhenReplicationFactorDecreasesAndMissingAnAddingReplicasFromISR()
 {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+                partitionAssignment(List.of(0, 1, 2, 3, 4)), 
partitionAssignment(List.of(2, 3, 4, 5)));
+        assertTrue(replicas.isReassignmentInProgress());
+
+        // Replica 5 is not in sync
         Optional<PartitionReassignmentReplicas.CompletedReassignment> 
reassignmentOptional =
-            replicas.maybeCompleteReassignment(List.of(3));
+                replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 4));
         assertFalse(reassignmentOptional.isPresent());
     }
 

Reply via email to