siddharthteotia commented on a change in pull request #4446: Add support in the
rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r309761581
##########
File path:
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/TableRebalancerTest.java
##########
@@ -112,9 +106,322 @@ public void downtimeUpdateWithNoCommonElements() {
Map<String, String> srcMap = current.getRecord().getMapField(segmentId);
TableRebalancer updater = new TableRebalancer(null, null, null);
- updater.updateSegmentIfNeeded(segmentId, srcMap, targetMap, current,
downtime);
+ updater.updateSegmentIfNeeded(segmentId, srcMap, targetMap, current, true,
1);
Map<String, String> tempMap = current.getRecord().getMapField(segmentId);
Assert.assertTrue(EqualityUtils.isEqual(tempMap, targetMap));
}
+
+ /**
+ * Test for no downtime rebalance with no common hosts between
+ * current and target ideal state and a request to keep minimum
+ * 2 replicas up while rebalancing
+ */
+ @Test
+ public void noDowntimeUpdateWithNoCommonHostsAndTwoMinReplicas() {
+ IdealState currentIdealState = new IdealState("rebalance");
+ currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+ IdealState targetIdealState = new IdealState("rebalance");
+ targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+ // copying the current ideal state here to mimic how
+ // TableRebalancer.rebalance() works between multiple invocations to
+ // updateSegmentIfNeeded. In other words, the latter function updates the
+ // passed in copy of current ideal state, returns and the updated state is
+ // then written into ZK as the latest current ideal state. Before
+ // updateSegmentIfNeeded() again, the rebalance() method gets
+ // the current ideal state from ZK
+ IdealState toUpdateIdealState =
HelixHelper.cloneIdealState(currentIdealState);
+
+ final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+ final Map<String, String> targetSegmentInstancesMap =
targetIdealState.getInstanceStateMap(segmentId);
+
+ // STEP 1
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // After step 1, let's assume that
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online,
host3 : online } }
+ // Essentially, host1 from current state got replaced with host4 from
target state
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there is exactly 1 host in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
1, true);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 2
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // Now if the logic to keep up minimum number of serving replicas is doing
+ // the right thing, in step 2 it should not have simply updated the ideal
+ // state to target in one step simply because there is one common element
(host4).
+ // The reason being if we directly do this update, then we are not
guaranteeing
+ // that minimum 2 replicas should be kept up as in direct update host4
+ // is the only one that might be up for serving queries. So right thing to
do
+ // is what happened in step 1 -- replace a host in current ideal state with
+ // a host from target ideal state. This will ensure that 2 serving
replicas are up.
+ //
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online,
host3 : online } }
+ // Essentially, host2 from current state got replaced with host5
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there are exactly 2 hosts in common and not 3
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
2, true);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 3
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // At this point, it would be perfectly fine to just directly set the ideal
+ // state to target by updating the segment's partition map since the number
+ // of hosts in common >= min serving replicas we need
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
3, true);
+
+ // Verify the same behavior (as described in above steps) through stats
+ TableRebalancer.RebalancerStats rebalancerStats =
rebalancer.getRebalancerStats();
+ // STEP 1 and STEP 2 -- incremental updates to segment's partition map
+
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
2);
+ // STEP 3 -- final direct update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+ }
+
+ /**
+ * Test for now downtime rebalance with no common hosts between
+ * current and target ideal state and a request to keep minimum
+ * 2 replicas up while rebalancing along with the increase in
+ * replicas in the target ideal state
+ */
+ @Test
+ public void noDowntimeUpdateWithNoCommonHostsAndIncreasedReplicas() {
+ IdealState currentIdealState = new IdealState("rebalance");
+ currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+ IdealState targetIdealState = new IdealState("rebalance");
+ targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host7", "ONLINE");
+
+ IdealState toUpdateIdealState =
HelixHelper.cloneIdealState(currentIdealState);
+
+ final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+ final Map<String, String> targetSegmentInstancesMap =
targetIdealState.getInstanceStateMap(segmentId);
+
+ // STEP 1
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // After step 1, let's assume that
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online,
host3 : online } }
+ // Essentially, host1 from current state got replaced with host4 from
target state
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there is exactly 1 host in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
1, false);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 2
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // another incremental transition to keep up with 2 serving replicas
requirement
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online,
host3 : online } }
+ // Essentially, host2 from current state got replaced with host5
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there are exactly 2 hosts in common and not 3
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
2, false);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 3
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // At this point, it would be perfectly fine to just directly set the
partition
+ // map in ideal state to target since the number of hosts in common >= min
serving
+ // replicas we need
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
4, true);
+
+ // Verify the same behavior (as described in above steps) through stats
+ TableRebalancer.RebalancerStats rebalancerStats =
rebalancer.getRebalancerStats();
+ // STEP 1 and STEP 2 -- incremental updates to segment's partition map
+
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
2);
+ // STEP 3 -- final direct update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+ }
+
+ /**
+ * Test for no downtime rebalance with no common hosts between
+ * current and target ideal state and a request to keep minimum
+ * 1 replica up while rebalancing
+ */
+ @Test
+ public void noDowntimeUpdateWithNoCommonHostsAndOneMinReplica() {
+ IdealState currentIdealState = new IdealState("rebalance");
+ currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+ IdealState targetIdealState = new IdealState("rebalance");
+ targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+ IdealState toUpdateIdealState =
HelixHelper.cloneIdealState(currentIdealState);
+
+ final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+ final Map<String, String> targetSegmentInstancesMap =
targetIdealState.getInstanceStateMap(segmentId);
+
+ // STEP 1
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 1);
+ // After step 1, let's assume that
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online,
host3 : online } }
+ // Essentially, host1 from current state got replaced with host4
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there is exactly 1 host in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
1, true);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 2
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 1);
+ // Now since the minimum number of serving replicas we need is 1, it is
fine
+ // to directly (in one step) set the partition mapping to target by
replacing
+ // host2 and host3 from current state with host5 and host6 from target
state
+ // Doing this direct update still ensures that 1 replica (host4) is up for
serving
+ //
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host5 : online,
host6 : online } }
+ // Essentially, host2, host3 from current state got replaced with host5
and host6
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // all hosts are in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
3, true);
+
+ TableRebalancer.RebalancerStats rebalancerStats =
rebalancer.getRebalancerStats();
+ // STEP 1 -- incremental update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
1);
+ // STEP 2 -- final direct update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+ }
+
+ /**
+ * Test for no downtime rebalance with no common hosts between
+ * current and target ideal state and a request to keep an invalid
+ * number of minimum replicas up while rebalancing
+ */
+ @Test
+ public void noDowntimeUpdateWithNoCommonHostsAndInvalidMinReplicas() {
+ IdealState currentIdealState = new IdealState("rebalance");
+ currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+ IdealState targetIdealState = new IdealState("rebalance");
+ targetIdealState.setPartitionState(segmentId, "host4", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+ IdealState toUpdateIdealState =
HelixHelper.cloneIdealState(currentIdealState);
+
+ final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+ final Map<String, String> targetSegmentInstancesMap =
targetIdealState.getInstanceStateMap(segmentId);
+
+ // STEP 1
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 4);
+ // After step 1, let's assume that
+ // toUpdateIdealState : { SEGMENT1 : { host4 : online, host2 : online,
host3 : online } }
+ // Essentially, host1 from current state got replaced with host4
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there is exactly 1 host in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
1, true);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 2
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 4);
+ // Now since the minimum number of serving replicas we asked for are 4,
+ // the rebalancer will detect that is impossible to keep up 4 replicas
+ // since a segment only has 3 replicas in the current configuration. So
+ // it is fine to directly set the segment's partition map to target in this
+ // step by replacing host2 and host3 from current state with host5 and
host6
+ // from target state Doing this direct update still ensures that
+ // 1 replica (host4) is up for serving
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // all hosts are in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
3, true);
+
+ TableRebalancer.RebalancerStats rebalancerStats =
rebalancer.getRebalancerStats();
+ // STEP 1 -- incremental update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
1);
+ // STEP 2 -- final direct update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+ }
+
+ /**
+ * Test for now downtime rebalance with common hosts between
+ * current and target ideal state and a request to keep minimum
+ * 2 replicas up while rebalancing
+ */
+ @Test
+ public void noDowntimeUpdateWithCommonHostsAndMinReplicas() {
+ IdealState currentIdealState = new IdealState("rebalance");
+ currentIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host2", "ONLINE");
+ currentIdealState.setPartitionState(segmentId, "host3", "ONLINE");
+
+ IdealState targetIdealState = new IdealState("rebalance");
+ targetIdealState.setPartitionState(segmentId, "host1", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host5", "ONLINE");
+ targetIdealState.setPartitionState(segmentId, "host6", "ONLINE");
+
+ IdealState toUpdateIdealState =
HelixHelper.cloneIdealState(currentIdealState);
+
+ final TableRebalancer rebalancer = new TableRebalancer(null, null, null);
+ final Map<String, String> targetSegmentInstancesMap =
targetIdealState.getInstanceStateMap(segmentId);
+
+ // STEP 1
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // After step 1, let's assume that
+ // toUpdateIdealState : { SEGMENT1 : { host1 : online, host5 : online,
host3 : online } }
+ // Essentially, host2 from current state got replaced with host5
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // there are exactly 2 common hosts since we already had 1 common host to
begin with
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
2, true);
+ currentIdealState = toUpdateIdealState;
+
+ // STEP 2
+ rebalancer.updateSegmentIfNeeded(segmentId,
currentIdealState.getRecord().getMapField(segmentId),
+ targetSegmentInstancesMap, toUpdateIdealState, false, 2);
+ // Now since the minimum number of serving replicas we asked for are 2,
+ // we don't need to update the segment's partition map incrementally. Since
+ // the common hosts (host1 and host5) satisfy the requirement of minimum
+ // 2 serving replicas, the partition map can be set to target at one go.
+ // Now, what we are checking is that between toUpdate and target ideal
state,
+ // all hosts are in common
+ verifyStateForMinReplicaConstraint(toUpdateIdealState, targetIdealState,
3, true);
+
+ TableRebalancer.RebalancerStats rebalancerStats =
rebalancer.getRebalancerStats();
+ // STEP 1 -- incremental update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getIncrementalUpdatesToSegmentInstanceMap(),
1);
+ // STEP 2 -- final direct update to segment's partition map
+
Assert.assertEquals(rebalancerStats.getDirectUpdatesToSegmentInstanceMap(), 1);
+ }
+
+ private void verifyStateForMinReplicaConstraint(
+ final IdealState updated, final IdealState target,
+ final int same, final boolean sizeCheck) {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]