siddharthteotia commented on a change in pull request #4446: Add support in the 
rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r309762640
 
 

 ##########
 File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/TableRebalancerTest.java
 ##########
 @@ -112,9 +106,322 @@ public void downtimeUpdateWithNoCommonElements() {
 
     Map<String, String> srcMap = current.getRecord().getMapField(segmentId);
     TableRebalancer updater = new TableRebalancer(null, null, null);
-    updater.updateSegmentIfNeeded(segmentId, srcMap, targetMap, current, 
downtime);
+    updater.updateSegmentIfNeeded(segmentId, srcMap, targetMap, current, true, 
1);
 
     Map<String, String> tempMap = current.getRecord().getMapField(segmentId);
     Assert.assertTrue(EqualityUtils.isEqual(tempMap, targetMap));
   }
+
+  /**
+   * Test for no downtime rebalance with no common hosts between
+   * current and target ideal state and a request to keep minimum
+   * 2 replicas up while rebalancing
+   */
+  @Test
+  public void noDowntimeUpdateWithNoCommonHostsAndTwoMinReplicas() {
+    IdealState currentIdealState = new IdealState("rebalance");
+    currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+    IdealState targetIdealState = new IdealState("rebalance");
+    targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+    // copying the current ideal state here to mimic how
+    // TableRebalancer.rebalance() works between multiple invocations to
+    // updateSegmentIfNeeded. In other words, the latter function updates the
+    // passed in copy of current ideal state, returns and the updated state is
+    // then written into ZK as the latest current ideal state. Before
+    // updateSegmentIfNeeded() again, the rebalance() method gets
+    // the current ideal state from ZK
+    IdealState toUpdateIdealState = 
HelixHelper.cloneIdealState(currentIdealState);
+
+    final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+    final Map<String, String> targetSegmentInstancesMap = 
targetIdealState.getInstanceStateMap(segmentId);
+
+    // STEP 1
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // After step 1, let's assume that
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online, 
host3 : online } }
+    // Essentially, host1 from current state got replaced with host4 from 
target state
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // there is exactly 1 host in common
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
1, true);
+    currentIdealState = toUpdateIdealState;
+
+    // STEP 2
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // Now if the logic to keep up minimum number of serving replicas is doing
+    // the right thing, in step 2 it should not have simply updated the ideal
+    // state to target in one step simply because there is one common element 
(host4).
+    // The reason being if we directly do this update, then we are not 
guaranteeing
+    // that minimum 2 replicas should be kept up as in direct update host4
+    // is the only one that might be up for serving queries. So right thing to 
do
+    // is what happened in step 1 -- replace a host in current ideal state with
+    // a host from target ideal state. This will ensure that 2 serving 
replicas are up.
+    //
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online, 
host3 : online } }
+    // Essentially, host2 from current state got replaced with host5
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // there are exactly 2 hosts in common and not 3
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
2, true);
+    currentIdealState = toUpdateIdealState;
+
+    // STEP 3
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // At this point, it would be perfectly fine to just directly set the ideal
+    // state to target by updating the segment's partition map since the number
+    // of hosts in common >= min serving replicas we need
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
3, true);
+
+    // Verify the same behavior (as described in above steps) through stats
+    TableRebalancer.RebalancerStats rebalancerStats = 
rebalancer.getRebalancerStats();
+    // STEP 1 and STEP 2 -- incremental updates to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
 2);
+    // STEP 3 -- final direct update to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+  }
+
+  /**
+   * Test for now downtime rebalance with no common hosts between
+   * current and target ideal state and a request to keep minimum
+   * 2 replicas up while rebalancing along with the increase in
+   * replicas in the target ideal state
+   */
+  @Test
+  public void noDowntimeUpdateWithNoCommonHostsAndIncreasedReplicas() {
+    IdealState currentIdealState = new IdealState("rebalance");
+    currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+    IdealState targetIdealState = new IdealState("rebalance");
+    targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host7", "ONLINE");
+
+    IdealState toUpdateIdealState = 
HelixHelper.cloneIdealState(currentIdealState);
+
+    final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+    final Map<String, String> targetSegmentInstancesMap = 
targetIdealState.getInstanceStateMap(segmentId);
+
+    // STEP 1
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // After step 1, let's assume that
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online, 
host3 : online } }
+    // Essentially, host1 from current state got replaced with host4 from 
target state
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // there is exactly 1 host in common
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
1, false);
+    currentIdealState = toUpdateIdealState;
+
+    // STEP 2
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // another incremental transition to keep up with 2 serving replicas 
requirement
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online, 
host3 : online } }
+    // Essentially, host2 from current state got replaced with host5
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // there are exactly 2 hosts in common and not 3
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
2, false);
+    currentIdealState = toUpdateIdealState;
+
+    // STEP 3
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+    // At this point, it would be perfectly fine to just directly set the 
partition
+    // map in ideal state to target since the number of hosts in common >= min 
serving
+    // replicas we need
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
4, true);
+
+    // Verify the same behavior (as described in above steps) through stats
+    TableRebalancer.RebalancerStats rebalancerStats = 
rebalancer.getRebalancerStats();
+    // STEP 1 and STEP 2 -- incremental updates to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
 2);
+    // STEP 3 -- final direct update to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+  }
+
+  /**
+   * Test for no downtime rebalance with no common hosts between
+   * current and target ideal state and a request to keep minimum
+   * 1 replica up while rebalancing
+   */
+  @Test
+  public void noDowntimeUpdateWithNoCommonHostsAndOneMinReplica() {
+    IdealState currentIdealState = new IdealState("rebalance");
+    currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+    currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+    IdealState targetIdealState = new IdealState("rebalance");
+    targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+    targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+    IdealState toUpdateIdealState = 
HelixHelper.cloneIdealState(currentIdealState);
+
+    final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+    final Map<String, String> targetSegmentInstancesMap = 
targetIdealState.getInstanceStateMap(segmentId);
+
+    // STEP 1
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 1);
+    // After step 1, let's assume that
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online, 
host3 : online } }
+    // Essentially, host1 from current state got replaced with host4
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // there is exactly 1 host in common
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
1, true);
+    currentIdealState = toUpdateIdealState;
+
+    // STEP 2
+    rebalancer.updateSegmentIfNeeded(segmentId, 
currentIdealState.getRecord().getMapField(segmentId),
+        targetSegmentInstancesMap, toUpdateIdealState, false, 1);
+    // Now since the minimum number of serving replicas we need is 1, it is 
fine
+    // to directly (in one step) set the partition mapping to target by 
replacing
+    // host2 and host3 from current state with host5 and host6 from target 
state
+    // Doing this direct update still ensures that 1 replica (host4) is up for 
serving
+    //
+    // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online, 
host6 : online } }
+    // Essentially, host2, host3 from current state got replaced with host5 
and host6
+    // Now, what we are checking is that between toUpdate and target ideal 
state,
+    // all hosts are in common
+    verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState, 
3, true);
+
+    TableRebalancer.RebalancerStats rebalancerStats = 
rebalancer.getRebalancerStats();
+    // STEP 1 -- incremental update to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
 1);
+    // STEP 2 -- final direct update to segment's partition map
+    
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+  }
+
+  /**
+   * Test for no downtime rebalance with no common hosts between
+   * current and target ideal state and a request to keep an invalid
+   * number of minimum replicas up while rebalancing
+   */
+  @Test
+  public void noDowntimeUpdateWithNoCommonHostsAndInvalidMinReplicas() {
 
 Review comment:
   This test file doesn't exercise the full code in TableRebalancer. It was 
written that way originally. IT directly calls the updateSegment() method that 
just updates the ideal state -- the check is still done there to safeguard 
code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to