This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b89d1b4a83 [Bug fix] Fault-Domain-Aware Instance Assignment failing 
rebalance with minimize data movement (#17799)
7b89d1b4a83 is described below

commit 7b89d1b4a83b10f96fd940fca549c97acc16e7a4
Author: Jhow <[email protected]>
AuthorDate: Wed Mar 4 08:44:32 2026 -0800

    [Bug fix] Fault-Domain-Aware Instance Assignment failing rebalance with 
minimize data movement (#17799)
---
 .../instance/FDAwareInstancePartitionSelector.java |  8 ++-
 .../instance/InstanceAssignmentTest.java           | 61 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 89d64272e3d..10db8702f39 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -190,7 +191,7 @@ public class FDAwareInstancePartitionSelector extends 
InstancePartitionSelector
       // preprocess the problem of numReplicaGroups >= numFaultDomains to a 
problem
       
replicaGroupBasedAssignmentState.normalize(faultDomainToCandidateInstancesMap);
 
-      // fill the remaining vacant seats
+      // fill the remaining vacant seats if any
       
replicaGroupBasedAssignmentState.fill(faultDomainToCandidateInstancesMap);
 
       // adjust the instance assignment to achieve the invariant state
@@ -389,6 +390,11 @@ public class FDAwareInstancePartitionSelector extends 
InstancePartitionSelector
      * Fill the vacant instances
      */
     public void fill(Map<Integer, LinkedHashSet<String>> 
faultDomainToCandidateInstancesMap) {
+      // skip filling if there is no candidate instance, which can happen when 
minimize data movement is enabled and
+      // no new instances are added to any pool
+      if 
(faultDomainToCandidateInstancesMap.values().stream().allMatch(Set::isEmpty)) {
+        return;
+      }
       // convert set to que and start to assign
       CandidateQueue candidateQueue = new 
CandidateQueue(faultDomainToCandidateInstancesMap);
       if (_numReplicaGroups != 0) { // uplift instance per replica group first 
if not a fresh new assignment
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 74badfb7fcd..62a1af26bd6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -3330,4 +3330,65 @@ public class InstanceAssignmentTest {
             SERVER_INSTANCE_ID_PREFIX + "11" + SERVER_INSTANCE_POOL_PREFIX + 1,
             SERVER_INSTANCE_ID_PREFIX + "17" + SERVER_INSTANCE_POOL_PREFIX + 
2));
   }
+
+  @Test
+  public void testPoolBasedFDAwareSteadyStateMinimizeDataMovement() {
+    // Test that a rebalance with minimizeDataMovement=true and no instance 
changes does not throw
+    // NoSuchElementException. This is a regression test for the case where 
all candidate instances are empty
+    // after preprocessing (no new instances added to any pool).
+
+    // 21 instances in 5 pools, with [5,4,4,4,4] instances in each pool
+    int numInstances = 21;
+    int numPools = 5;
+    int numReplicaGroups = 3;
+    int numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      int pool = i % numPools;
+      InstanceConfig instanceConfig =
+          new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i + 
SERVER_INSTANCE_POOL_PREFIX + pool);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Initial assignment (no minimize data movement, no existing partitions)
+    InstanceTagPoolConfig tagPoolConfig = new 
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, 0, 0, false,
+            null);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+        .build();
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    InstancePartitions initialPartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
+    assertEquals(initialPartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(initialPartitions.getNumPartitions(), 1);
+
+    // Now re-run with the same instances and minimizeDataMovement=true, 
passing existing partitions.
+    // Before the fix in #17799, this would throw NoSuchElementException 
because CandidateQueue was created with an
+    // empty map (all existing instances were removed from candidates during 
preprocessing, leaving empty sets).
+    replicaPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, 0, 0, true,
+            null);
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
+        .build();
+    driver = new InstanceAssignmentDriver(tableConfig);
+    InstancePartitions steadyStatePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, initialPartitions);
+
+    // Assignment should be unchanged
+    assertEquals(steadyStatePartitions.getNumReplicaGroups(), 
numReplicaGroups);
+    assertEquals(steadyStatePartitions.getNumPartitions(), 1);
+    for (int rg = 0; rg < numReplicaGroups; rg++) {
+      assertEquals(steadyStatePartitions.getInstances(0, rg), 
initialPartitions.getInstances(0, rg));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to