yupeng9 commented on a change in pull request #6212:
URL: https://github.com/apache/incubator-pinot/pull/6212#discussion_r514868201



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
##########
@@ -546,22 +554,61 @@ static boolean isExternalViewConverged(String 
tableNameWithType,
     return true;
   }
 
-  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
-      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas) {
+  /**
+   * Returns the next assignment for the table based on the current assignment 
and the target assignment with regards to
+   * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
+   * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
+   * is meet. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.

Review comment:
       is meet -> is met

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
##########
@@ -546,22 +554,61 @@ static boolean isExternalViewConverged(String 
tableNameWithType,
     return true;
   }
 
-  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
-      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas) {
+  /**
+   * Returns the next assignment for the table based on the current assignment 
and the target assignment with regards to
+   * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
+   * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
+   * is meet. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
-
-    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      nextAssignment.put(segmentName,
-          getNextInstanceStateMap(entry.getValue(), 
targetAssignment.get(segmentName), minAvailableReplicas));
+    if (enableStrictReplicaGroup) {
+      Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
+      for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+        String segmentName = entry.getKey();
+        Map<String, String> currentInstanceStateMap = entry.getValue();
+        Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+        SingleSegmentAssignment assignment =
+            getNextAssignment(currentInstanceStateMap, targetInstanceStateMap, 
minAvailableReplicas);
+        Set<String> assignedInstances = assignment._instanceStateMap.keySet();
+        Set<String> availableInstances = assignment._availableInstances;
+        Set<String> currentAvailableInstances = 
availableInstancesMap.get(assignedInstances);
+        if (currentAvailableInstances == null) {
+          // First segment assigned to these instances, use the new assignment 
and update the available instances
+          nextAssignment.put(segmentName, assignment._instanceStateMap);
+          availableInstancesMap.put(assignedInstances, availableInstances);
+        } else {
+          // There are other segments assigned to the same instances, check 
the available instances to see if adding the
+          // new assignment can still hold the minimum available replicas 
requirement
+          availableInstances.retainAll(currentAvailableInstances);
+          if (availableInstances.size() >= minAvailableReplicas) {
+            // New assignment can be added
+            nextAssignment.put(segmentName, assignment._instanceStateMap);
+            availableInstancesMap.put(assignedInstances, availableInstances);
+          } else {
+            // New assignment cannot be added, use the current instance state 
map

Review comment:
       hmm, does the greedy algorithm always work? Is it possible the iteration 
order of `currentAssignment.entrySet` matters?

##########
File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
##########
@@ -123,72 +134,232 @@ public void testTwoMinAvailableReplicas() {
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3", "host4"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3", "host5"), ONLINE);
-    Map<String, String> nextInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2", "host3")));
 
     // With 2 common instances, next assignment should be the same as target 
assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host5", "host6"), ONLINE);
-    nextInstanceStateMap = 
TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2")));
 
     // With 1 common instance, next assignment should have 2 common instances 
with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host5", "host6", "host7"), ONLINE);
-    nextInstanceStateMap = 
TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
     // [host1, host2, host5, host6]
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, 
nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, 
targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment = 
TableRebalancer.getNextAssignment(assignment._instanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host5", "host6")));
 
     // Without common instance, next assignment should have 2 common instances 
with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", 
"host6", "host7", "host8"), ONLINE);
     // [host1, host2, host5, host6]
-    nextInstanceStateMap = 
TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, 
nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, 
targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment = 
TableRebalancer.getNextAssignment(assignment._instanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host5", "host6")));
 
     // With increasing number of replicas, next assignment should have 1 
common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", 
"host6", "host7", "host8", "host9"), ONLINE);
     // [host1, host2, host5, host6, host7]
-    nextInstanceStateMap = 
TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, 
nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, 
targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment = 
TableRebalancer.getNextAssignment(assignment._instanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host5", "host6", "host7")));
 
     // With decreasing number of replicas, next assignment should have 2 
common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", 
"host6", "host7"), ONLINE);
     // [host1, host2, host5]
-    Map<String, String> firstRoundInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, 
firstRoundInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextAssignment(currentInstanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should have 2 common instances with first round assignment
     // [host1, host5, host6]
-    Map<String, String> secondRoundInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(firstRoundInstanceStateMap, 
targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(firstRoundInstanceStateMap, 
secondRoundInstanceStateMap), 2);
+    assignment = 
TableRebalancer.getNextAssignment(assignment._instanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host1", "host5")));
     // Next round should make the assignment the same as target assignment
-    
assertEquals(TableRebalancer.getNextInstanceStateMap(secondRoundInstanceStateMap,
 targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment = 
TableRebalancer.getNextAssignment(assignment._instanceStateMap, 
targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new 
TreeSet<>(Arrays.asList("host5", "host6")));
   }
 
-  private int getNumCommonInstances(Map<String, String> 
currentInstanceStateMap,
-      Map<String, String> nextInstanceStateMap) {
-    int numCommonInstances = 0;
-    for (String instanceId : currentInstanceStateMap.keySet()) {
-      if (nextInstanceStateMap.containsKey(instanceId)) {
-        numCommonInstances++;
-      }
-    }
-    return numCommonInstances;
+  @Test
+  public void testStrictReplicaGroup() {
+    // Current assignment:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    currentAssignment
+        .put("segment1", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", 
"host3"), ONLINE));
+    currentAssignment
+        .put("segment2", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", 
"host4"), ONLINE));
+    currentAssignment
+        .put("segment3", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", 
"host3"), ONLINE));
+    currentAssignment
+        .put("segment4", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", 
"host4"), ONLINE));
+
+    // Target assignment:
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+    targetAssignment
+        .put("segment1", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", 
"host6"), ONLINE));
+    targetAssignment
+        .put("segment2", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", 
"host5"), ONLINE));
+    targetAssignment
+        .put("segment3", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", 
"host6"), ONLINE));
+    targetAssignment
+        .put("segment4", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", 
"host5"), ONLINE));
+
+    // Next assignment with 2 minimum available replicas without strict 
replica-group:
+    // (This assignment will move "segment1" and "segment3" from "host3" to 
"host4", and move "segment2" and "segment4"
+    // from "host3" to "host1". "host1" and "host4" might be unavailable for 
strict replica-group routing, which breaks
+    // the minimum available replicas requirement.)
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> nextAssignment =
+        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 
2, false);
+    assertEquals(nextAssignment.get("segment1").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+
+    // Next assignment with 2 minimum available replicas with strict 
replica-group:
+    // (This assignment will only move "segment1" and "segment3" from "host3" 
to "host4". Only "host4" can be
+    // unavailable for strict replica-group routing during the rebalance, 
which meets the minimum available replicas
+    // requirement.)
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new 
TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new 
TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new 
TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+
+    // Next assignment with 2 minimum available replicas with strict 
replica-group:
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }

Review comment:
       can you also add a test that no strict replica-group assignment 
available but best-effort replicas-group assignment is possible.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
##########
@@ -546,22 +554,61 @@ static boolean isExternalViewConverged(String 
tableNameWithType,
     return true;
   }
 
-  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
-      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas) {
+  /**
+   * Returns the next assignment for the table based on the current assignment 
and the target assignment with regards to
+   * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
+   * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
+   * is meet. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,

Review comment:
       nit: you can split this into two methods, one for 
`enableStrictReplicaGroup ` and the other for not.
   
   usually, function polymorphism shall return the same type and this one 
returns a different type from the other.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to