Add cluster-level and resource-level config option to allow disable delayed rebalance of entire cluster or individual resource.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c5e12b1e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c5e12b1e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c5e12b1e Branch: refs/heads/helix-0.6.x Commit: c5e12b1e6b7aa7f8b2e43e4878be3f4c3de81f82 Parents: a294ab2 Author: Lei Xia <l...@linkedin.com> Authored: Wed Sep 21 10:53:31 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Wed Feb 8 09:54:08 2017 -0800 ---------------------------------------------------------------------- .../rebalancer/DelayedAutoRebalancer.java | 80 +++++++++++++----- .../rebalancer/util/RebalanceScheduler.java | 6 +- .../org/apache/helix/model/ClusterConfig.java | 12 ++- .../java/org/apache/helix/model/IdealState.java | 21 ++++- .../helix/model/builder/IdealStateBuilder.java | 25 +++++- .../integration/TestDelayedAutoRebalance.java | 89 ++++++++++++++++++++ .../integration/ZkIntegrationTestBase.java | 12 +++ 7 files changed, 217 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 1e127bc..d1718fc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -24,6 +24,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; @@ -80,10 +81,12 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { allNodes = clusterData.getEnabledInstances(); } + ClusterConfig clusterConfig = clusterData.getClusterConfig(); + long delayTime = getRebalanceDelay(currentIdealState, clusterConfig); Set<String> activeNodes = getActiveInstances(currentIdealState, allNodes, liveNodes, - clusterData.getInstanceOfflineTimeMap()); - - setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap()); + clusterData.getInstanceOfflineTimeMap(), delayTime, clusterConfig); + setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap(), + delayTime, clusterConfig); if (allNodes.isEmpty() || activeNodes.isEmpty()) { LOG.error(String.format( @@ -105,8 +108,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { emptyMapping(currentIdealState)); } - int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount); - LinkedHashMap<String, Integer> stateCountMap = StateModelDefinition.getStateCountMap(stateModelDef, activeNodes.size(), replicaCount); Map<String, Map<String, String>> currentMapping = @@ -120,18 +121,25 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { // sort node lists to ensure consistent preferred assignments List<String> allNodeList = new ArrayList<String>(allNodes); List<String> liveNodeList = new ArrayList<String>(liveNodes); - List<String> activeNodeList = new ArrayList<String>(activeNodes); Collections.sort(allNodeList); Collections.sort(liveNodeList); - Collections.sort(activeNodeList); ZNRecord newIdealMapping = _rebalanceStrategy .computePartitionAssignment(allNodeList, liveNodeList, currentMapping, clusterData); - ZNRecord newActiveMapping = _rebalanceStrategy - .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData); - ZNRecord finalMapping = - getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes, - replicaCount, minActiveReplicas); + ZNRecord finalMapping = newIdealMapping; + + if (!isDelayRebalanceDisabled(currentIdealState, clusterConfig)) { + List<String> activeNodeList = new ArrayList<String>(activeNodes); + Collections.sort(activeNodeList); + int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount); + + ZNRecord newActiveMapping = _rebalanceStrategy + .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData); + finalMapping = + getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes, + replicaCount, minActiveReplicas); + LOG.debug("newActiveMapping: " + newActiveMapping); + } if (LOG.isDebugEnabled()) { LOG.debug("currentMapping: " + currentMapping); @@ -140,7 +148,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { LOG.debug("allNodes: " + allNodes); LOG.debug("maxPartition: " + maxPartition); LOG.debug("newIdealMapping: " + newIdealMapping); - LOG.debug("newActiveMapping: " + newActiveMapping); LOG.debug("finalMapping: " + finalMapping); } @@ -159,13 +166,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { /* get all active instances (live instances plus offline-yet-active instances */ private Set<String> getActiveInstances(IdealState idealState, Set<String> allNodes, - Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap) { + Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap, long delayTime, + ClusterConfig clusterConfig) { Set<String> activeInstances = new HashSet<String>(liveNodes); + + if (isDelayRebalanceDisabled(idealState, clusterConfig)) { + return activeInstances; + } + Set<String> offlineInstances = new HashSet<String>(allNodes); offlineInstances.removeAll(liveNodes); long currentTime = System.currentTimeMillis(); - long delayTime = idealState.getRebalanceDelay(); for (String ins : offlineInstances) { Long offlineTime = instanceOfflineTimeMap.get(ins); if (offlineTime != null && offlineTime > 0) { @@ -180,10 +192,14 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { /* Set a rebalance scheduler for the closest future rebalance time. */ private void setRebalanceScheduler(IdealState idealState, Set<String> activeInstances, - Map<String, Long> instanceOfflineTimeMap) { - long nextRebalanceTime = Long.MAX_VALUE; - long delayTime = idealState.getRebalanceDelay(); + Map<String, Long> instanceOfflineTimeMap, long delayTime, ClusterConfig clusterConfig) { + String resourceName = idealState.getResourceName(); + if (isDelayRebalanceDisabled(idealState, clusterConfig)) { + _scheduledRebalancer.removeScheduledRebalance(resourceName); + return; + } + long nextRebalanceTime = Long.MAX_VALUE; for (String ins : activeInstances) { Long offlineTime = instanceOfflineTimeMap.get(ins); if (offlineTime != null && offlineTime > 0) { @@ -197,16 +213,31 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { } } - String resourceName = idealState.getResourceName(); - LOG.debug(String - .format("Next rebalance time for resource %s is %d\n", resourceName, nextRebalanceTime)); if (nextRebalanceTime == Long.MAX_VALUE) { - _scheduledRebalancer.removeScheduledRebalance(resourceName); + long startTime = _scheduledRebalancer.removeScheduledRebalance(resourceName); + LOG.debug(String + .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime)); } else { _scheduledRebalancer.scheduleRebalance(_manager, resourceName, nextRebalanceTime); + LOG.debug(String.format("Set next rebalance time for resource %s at time %d\n", resourceName, + nextRebalanceTime)); } } + private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) { + long delayTime = idealState.getRebalanceDelay(); + if (delayTime < 0) { + delayTime = clusterConfig.getRebalanceDelayTime(); + } + return delayTime; + } + + private boolean isDelayRebalanceDisabled(IdealState idealState, ClusterConfig clusterConfig) { + long delayTime = getRebalanceDelay(idealState, clusterConfig); + return (delayTime < 0 || idealState.isDelayRebalanceDisabled() || clusterConfig + .isDelayRebalaceDisabled()); + } + private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping, ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) { if (minActiveReplica >= numReplica) { @@ -274,8 +305,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { Set<String> offlineNodes = cache.getAllInstances(); offlineNodes.removeAll(cache.getLiveInstances().keySet()); + ClusterConfig clusterConfig = cache.getClusterConfig(); + long delayTime = getRebalanceDelay(idealState, clusterConfig); Set<String> activeNodes = - getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap()); + getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap(), + delayTime, clusterConfig); String stateModelDefName = idealState.getStateModelDefRef(); StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java index bbc03d0..641c755 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java @@ -95,7 +95,7 @@ public class RebalanceScheduler { * * @param resource */ - public void removeScheduledRebalance(String resource) { + public long removeScheduledRebalance(String resource) { ScheduledTask existTask = _rebalanceTasks.remove(resource); if (existTask != null && !existTask.getFuture().isDone()) { if (!existTask.getFuture().cancel(true)) { @@ -104,7 +104,11 @@ public class RebalanceScheduler { LOG.info( "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: " + resource); + + return existTask.getStartTime(); } + + return -1; } /** http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 7b30fd7..23d66a4 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -33,7 +33,9 @@ public class ClusterConfig extends HelixProperty { HELIX_DISABLE_PIPELINE_TRIGGERS, TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" PERSIST_BEST_POSSIBLE_ASSIGNMENT, - FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition. + FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition. + DELAY_REBALANCE_DISABLED, // enabled the delayed rebalaning in case node goes offline. + DELAY_REBALANCE_TIME // delayed time in ms that the delay time Helix should hold until rebalancing. } /** @@ -73,6 +75,14 @@ public class ClusterConfig extends HelixProperty { .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); } + public long getRebalanceDelayTime() { + return _record.getLongField(ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), -1); + } + + public boolean isDelayRebalaceDisabled() { + return _record.getBooleanField(ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), false); + } + @Override public boolean equals(Object obj) { if (obj instanceof ClusterConfig) { http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 769a369..5ced7a6 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -52,6 +52,7 @@ public class IdealState extends HelixProperty { REPLICAS, MIN_ACTIVE_REPLICAS, REBALANCE_DELAY, + DELAY_REBALANCE_DISABLED, @Deprecated IDEAL_STATE_MODE, REBALANCE_MODE, @@ -204,7 +205,7 @@ public class IdealState extends HelixProperty { * @param delayInMilliseconds */ public void setRebalanceDelay(long delayInMilliseconds) { - _record.setLongField(IdealStateProperty.REBALANCE_DELAY.toString(), delayInMilliseconds); + _record.setLongField(IdealStateProperty.REBALANCE_DELAY.name(), delayInMilliseconds); } /** @@ -212,7 +213,23 @@ public class IdealState extends HelixProperty { * @return */ public long getRebalanceDelay() { - return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.toString(), 0); + return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.name(), -1); + } + + /** + * If disabled is true, the delayed rebalance time will be ignored. + * @param disabled + */ + public void setDelayRebalanceDisabled(boolean disabled) { + _record.setBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), disabled); + } + + /** + * Whether the delay rebalance is disabled. + * @return + */ + public boolean isDelayRebalanceDisabled() { + return _record.getBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), false); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java index e3000c2..09a7528 100644 --- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java @@ -51,6 +51,11 @@ public abstract class IdealStateBuilder { private long rebalanceDelayInMs = -1; /** + * Whether delay rebalance should be disabled. + */ + private Boolean delayRebalanceDisabled = null; + + /** * State model that is applicable for this resource */ private String stateModel; @@ -135,6 +140,20 @@ public abstract class IdealStateBuilder { } /** + * Disable Delayed Rebalance. + */ + public void disableDelayRebalance() { + delayRebalanceDisabled = true; + } + + /** + * Disable Delayed Rebalance. + */ + public void enableDelayRebalance() { + delayRebalanceDisabled = false; + } + + /** * @param numPartitions */ public IdealStateBuilder setNumPartitions(int numPartitions) { @@ -272,10 +291,14 @@ public abstract class IdealStateBuilder { idealstate.enableGroupRouting(enableGroupRouting); } - if (rebalanceDelayInMs > 0) { + if (rebalanceDelayInMs >= 0) { idealstate.setRebalanceDelay(rebalanceDelayInMs); } + if (delayRebalanceDisabled != null) { + idealstate.setDelayRebalanceDisabled(delayRebalanceDisabled); + } + if (!idealstate.isValid()) { throw new HelixException("invalid ideal-state: " + idealstate); } http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java index ba7f46e..6342d13 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java @@ -234,6 +234,95 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { } } + @Test + public void testDisableResourceDelayRebalance() throws Exception { + Map<String, IdealState> idealStates = new HashMap<String, IdealState>(); + Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>(); + + int minActiveReplica = _replica - 1; + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + IdealState idealState = + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + minActiveReplica, 100000); + _testDBs.add(db); + idealStates.put(db, idealState); + } + + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViewsBefore.put(db, ev); + } + + // bring down one node, no partition should be moved. + _participants.get(0).syncStop(); + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica); + validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName()); + } + + // disable delay rebalance for one db, partition should be moved immediately + String testDb = _testDBs.get(0); + IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState( + CLUSTER_NAME, testDb); + idealState.setDelayRebalanceDisabled(true); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); + Thread.sleep(1000); + + // once delay rebalance is disabled, it should maintain required number of replicas. + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, testDb); + idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb); + validateMinActiveAndTopStateReplica(idealState, ev, _replica); + } + + @Test + public void testDisableDelayRebalanceInCluster() throws Exception { + Map<String, IdealState> idealStates = new HashMap<String, IdealState>(); + disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + + int minActiveReplica = _replica - 1; + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + IdealState idealState = + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + minActiveReplica, 100000); + _testDBs.add(db); + idealStates.put(db, idealState); + } + + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verify()); + + // bring down one node, no partition should be moved. + _participants.get(0).syncStop(); + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verify()); + + // disable delay rebalance for the entire cluster. + disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + Thread.sleep(1000); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica); + } + + disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); + } + @AfterMethod public void afterTest() { // delete all DBs create in last test http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java index 9810f81..0edd4d3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java @@ -114,4 +114,16 @@ public class ZkIntegrationTestBase { ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), enable.toString()); } + + protected void disableDelayRebalanceInCluster(ZkClient zkClient, String clusterName, + Boolean disabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + HelixConfigScope clusterScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(clusterName).build(); + + configAccessor + .set(clusterScope, ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), + disabled.toString()); + } }