somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024994


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##########
@@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() {
       }
     }
   }
+
+  @Test
+  public void testAssignmentWithServerBatching() {
+    // Using LLC segment naming so that batching can be tested for both 
non-strict replica group and strict replica
+    // group based assignment. Otherwise, the partitionId might have to be 
fetched from ZK SegmentMetadata which will
+    // not exist since these aren't actual tables
+    // Current assignment:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    currentAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+    currentAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+    currentAssignment.put("segment__3__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+    currentAssignment.put("segment__4__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+    // Target assignment 1:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+    targetAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__3__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+    targetAssignment.put("segment__4__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 2,
+    //   "host4": 0,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    Map<String, Integer> numSegmentsToOffloadMap =
+        TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas with or without 
strict replica-group should reach the target
+    // assignment after two steps. Batch size = 1, unique partitionIds
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+      Map<String, Map<String, String>> nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+              1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+      assertNotEquals(nextAssignment, targetAssignment);
+      assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+      assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+      assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+      assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 
2, enableStrictReplicaGroup, false,
+              1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Next assignment with 2 minimum available replicas with our without 
strict replica-group should reach the target
+    // assignment after one steps. Batch size = 2, unique partitionIds
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+      Map<String, Map<String, String>> nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+              2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Target assignment 2:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host4", "host5"), ONLINE));
+    targetAssignment.put("segment__3__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__4__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host4", "host5"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 4,
+    //   "host4": -2,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    numSegmentsToOffloadMap = 
TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas with or without 
strict replica-group should finish in 3 steps
+    // with batching with batchSizePerServer = 1:
+    //
+    // The first assignment will move "segment1" and "segment2" by one host at 
a time, and since the other segments
+    // if added will go beyond batchSizePerServer, they'll be picked up on the 
next two assignments (host4 and host2):
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    // Second Assignment (host6, host1, host4, and host5 get 1 segment each):
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    //
+    // The third assignment should reach the target assignment
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Map<String, Map<String, String>> nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+              1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+      assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+      assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+      assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 
2, enableStrictReplicaGroup, false,
+              1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+      assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
+      assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+      assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+          new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 
2, enableStrictReplicaGroup, false,
+              1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Target assignment 3:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4"), ONLINE));
+    targetAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4"), ONLINE));
+    targetAssignment.put("segment__3__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4"), ONLINE));
+    targetAssignment.put("segment__4__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": -2,
+    //   "host2": 4,
+    //   "host3": 0,
+    //   "host4": -2
+    // }
+    numSegmentsToOffloadMap = 
TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+
+    // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target assignment
+    // in 1 step using batchSizePerServer = 2
+    Map<String, Map<String, String>> nextAssignment =
+        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 
2, false, false,
+            2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+    assertEquals(nextAssignment, targetAssignment);
+
+    // Next assignment with 2 minimum available replicas with strict 
replica-group should finish in 2 steps even with
+    // batchSizePerServer = 2:
+    //
+    // The first assignment will bring "segment1" and "segment3" to the target 
state. It cannot bring "segment2" and
+    // "segment4" to the target state because "host1" and "host4" might be 
unavailable for strict replica-group routing,
+    // which breaks the minimum available replicas requirement:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The second assignment should reach the target assignment
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, true, false,
+        2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+    assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+        new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+        new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+        new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+        new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, true, false,
+        2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+    assertEquals(nextAssignment, targetAssignment);
+
+    // Try assignment with overlapping partitions across segments, especially 
for strict replica group based.
+    // Non-strict replica group based should not matter
+    // Current assignment:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__1__1__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment__2__1__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    currentAssignment = new TreeMap<>();
+    currentAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+    currentAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+    currentAssignment.put("segment__1__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+    currentAssignment.put("segment__2__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+    // Target assignment 1 with just 2 overall partitions instead of 4 unique 
ones:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__1__1__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__2__1__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+    targetAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__1__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+    targetAssignment.put("segment__2__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 2,
+    //   "host4": 0,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    numSegmentsToOffloadMap =
+        TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
+    // assignment after two steps. With strict replica groups it should reach 
the target assignment immediately since
+    // the full partition must be selected for movement. Batch size = 1, 
unique partitionIds
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+      nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+              1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+      if (!enableStrictReplicaGroup) {
+        // Nothing should change, since we don't select based on partitions 
for non-strict replica groups
+        assertNotEquals(nextAssignment, targetAssignment);
+        
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+        
assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+        
assertEquals(nextAssignment.get("segment__1__1__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+        
assertEquals(nextAssignment.get("segment__2__1__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+        nextAssignment =
+            TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+                1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+      }
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Target assignment 2 with just 2 unique partitions:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment__1__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__2__0__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host4", "host5"), ONLINE));
+    targetAssignment.put("segment__1__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+    targetAssignment.put("segment__2__1__98347869999L",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host4", "host5"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 4,
+    //   "host4": -2,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    numSegmentsToOffloadMap =
+        TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
+    // assignment after two steps. With strict replica groups it should reach 
the target assignment immediately since
+    // the full partition must be selected for movement. Batch size = 2, 
unique partitionIds

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to