This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d267d45369 KAFKA-19975 Fix blocked reassignment completion due to ISR
missing replicas (#21114)
3d267d45369 is described below
commit 3d267d45369818c804ed49c56e9ae405e28b234c
Author: Deepak Goyal <[email protected]>
AuthorDate: Fri Jan 16 23:45:52 2026 +0530
KAFKA-19975 Fix blocked reassignment completion due to ISR missing replicas
(#21114)
The reassignment completion logic currently requires that all target
replicas be present in the in-sync replica set (ISR). If an unhealthy
broker exists in both the current and target replica sets (i.e., it is
neither added nor removed during the reassignment), it still blocks the
reassignment from completing.
For example, the following reassignments would fail to complete if
replica 2 were unhealthy and unable to join the ISR, even though
completing the reassignment would not reduce the number of replicas in
the ISR:
- [0, 1, 2] -> [0, 1, 2, 3]
- [0, 1, 2] -> [1, 2, 3]
- [0, 1, 2] -> [2, 3, 4]
We should allow reassignments to complete if there are replicas in the
target replica set that are not in the ISR and are not part of the
adding set. This is safe because completing the reassignment would not
decrease the ISR size. However, if the target replica set is smaller
than the current replica set, we will continue to require that all
replicas in the target set are in the ISR to prevent the reassignment
from unintentionally shrinking the ISR.
Reviewers: Alyssa Huang <[email protected]>, Jun Rao <[email protected]>
---
.../controller/PartitionReassignmentReplicas.java | 10 +-
.../PartitionReassignmentReplicasTest.java | 106 ++++++++++++++++++++-
2 files changed, 110 insertions(+), 6 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 5f85dd1a3d6..00de30445bb 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -119,7 +119,15 @@ class PartitionReassignmentReplicas {
}
if (newTargetReplicas.isEmpty()) return Optional.empty();
}
- if (!newTargetIsr.containsAll(newTargetReplicas)) return
Optional.empty();
+
+ // Wait for all adding sync replicas to join the ISR, as new brokers
could be unhealthy.
+ if (!newTargetIsr.containsAll(adding)) return Optional.empty();
+
+ // If the replication factor is being reduced, wait for all target
sync replicas to be present in newTargetIsr to avoid
+ // the potential negative impact on ISR. For example: We reassign
[0,1,2,3,4] -> [2,3,4,5] when only 0, 1 and 5 are in
+ // ISR. If we complete the reassignment, we end up with ISR of [5]. If
we stay with the current assignment, ISR is
+ // [0,1]. So, by completing the reassignment in this case, we are
reducing the size of ISR unnecessarily.
+ if (adding.size() < removing.size() &&
!newTargetIsr.containsAll(newTargetReplicas)) return Optional.empty();
return Optional.of(
new CompletedReassignment(
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index 33c540b9d14..8d6c716781a 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -105,7 +105,7 @@ public class PartitionReassignmentReplicasTest {
}
@Test
- public void testDoesCompleteReassignmentAllNewReplicas() {
+ public void testCanCompleteReassignmentAllNewReplicas() {
PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(3, 4, 5)));
assertTrue(replicas.isReassignmentInProgress());
@@ -118,7 +118,7 @@ public class PartitionReassignmentReplicasTest {
}
@Test
- public void testDoesCompleteReassignmentSomeNewReplicas() {
+ public void testCanCompleteReassignmentSomeNewReplicas() {
PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(0, 1, 3)));
assertTrue(replicas.isReassignmentInProgress());
@@ -196,13 +196,109 @@ public class PartitionReassignmentReplicasTest {
build()));
}
+ // Tests that a reassignment completes when a target replica (that is also
present
+ // in the replica set prior to the reassignment) is missing from the ISR.
@Test
- public void
testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() {
+ public void
testCanCompleteReassignmentIfIsrDoesNotHaveAnExistingTargetReplica() {
PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
- partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(0, 1, 3)));
+ partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(0, 1, 3)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 1 is not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(0, 2, 3));
+ assertTrue(reassignmentOptional.isPresent());
+ PartitionReassignmentReplicas.CompletedReassignment
completedReassignment = reassignmentOptional.get();
+ assertEquals(List.of(0, 3), completedReassignment.isr());
+ assertEquals(List.of(0, 1, 3), completedReassignment.replicas());
+ }
+
+ // Tests that a reassignment completes when multiple target replicas (that
are also present
+ // in the replica set prior to the reassignment) are missing from the ISR.
+ @Test
+ public void
testCanCompleteReassignmentIfIsrDoesNotHaveBothExistingTargetReplica() {
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(0, 1, 3)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 0 and 1 are not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(2, 3));
+ assertTrue(reassignmentOptional.isPresent());
+ PartitionReassignmentReplicas.CompletedReassignment
completedReassignment = reassignmentOptional.get();
+ assertEquals(List.of(3), completedReassignment.isr());
+ assertEquals(List.of(0, 1, 3), completedReassignment.replicas());
+ }
+
+ // Tests that a reassignment does not complete when a target adding
replica is missing from the ISR.
+ @Test
+ public void
testDoesNotCompleteReassignmentIfIsrDoesNotHaveAnAddingTargetReplica() {
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2)),
partitionAssignment(List.of(0, 1, 3)));
assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 3 is not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(0, 1, 2));
+ assertFalse(reassignmentOptional.isPresent());
+ }
+
+ // Tests that a reassignment completes when RF is increasing and a target
replica (that is also
+ // present in the replica set prior to the reassignment) is missing from
the ISR.
+ @Test
+ public void
testCanCompleteReassignmentWhenReplicationFactorIncreasesAndMissingAnExistingTargetReplicaFromIsr()
{
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2, 3)),
partitionAssignment(List.of(1, 2, 3, 4, 5)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 3 is not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(0, 1, 2, 4, 5));
+ assertTrue(reassignmentOptional.isPresent());
+ PartitionReassignmentReplicas.CompletedReassignment
completedReassignment = reassignmentOptional.get();
+ assertEquals(List.of(1, 2, 4, 5), completedReassignment.isr());
+ assertEquals(List.of(1, 2, 3, 4, 5), completedReassignment.replicas());
+ }
+
+ // Tests that a reassignment does not complete when RF is increasing
+ // and a target adding replica is missing from the ISR
+ @Test
+ public void
testDoesNotCompleteReassignmentWhenReplicationFactorIncreasesAndMissingAnAddingReplicaFromIsr()
{
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2, 3)),
partitionAssignment(List.of(1, 2, 3, 4, 5)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 4 is not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 5));
+ assertFalse(reassignmentOptional.isPresent());
+ }
+
+ // Tests that a reassignment does not complete when RF is decreasing and a
target replica (that is also
+ // present in the replica set prior to the reassignment) is missing from
the ISR.
+ @Test
+ public void
testDoesNotCompleteReassignmentWhenReplicationFactorDecreasesAndMissingAnExistingTargetReplicaFromIsr()
{
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2, 3, 4)),
partitionAssignment(List.of(2, 3, 4, 5)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 4 is not in sync
+ Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
+ replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 5));
+ assertFalse(reassignmentOptional.isPresent());
+ }
+
+ // Tests that a reassignment does not complete when RF is decreasing
+ // and a target adding replica is missing from the ISR.
+ @Test
+ public void
testDoesNotCompleteReassignmentWhenReplicationFactorDecreasesAndMissingAnAddingReplicasFromISR()
{
+ PartitionReassignmentReplicas replicas = new
PartitionReassignmentReplicas(
+ partitionAssignment(List.of(0, 1, 2, 3, 4)),
partitionAssignment(List.of(2, 3, 4, 5)));
+ assertTrue(replicas.isReassignmentInProgress());
+
+ // Replica 5 is not in sync
Optional<PartitionReassignmentReplicas.CompletedReassignment>
reassignmentOptional =
- replicas.maybeCompleteReassignment(List.of(3));
+ replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 4));
assertFalse(reassignmentOptional.isPresent());
}