This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 7cc2a830a72b162f6bbf29dcd446cc12ded60cd5 Author: Harry Zhang <[email protected]> AuthorDate: Mon Nov 26 17:10:02 2018 -0800 drop instance should retry upon NotEmptyException --- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 24 +++++++++++++++++++++- .../java/org/apache/helix/task/JobRebalancer.java | 1 - .../java/org/apache/helix/task/TaskRebalancer.java | 10 ++++++--- .../helix/util/WeightAwareRebalanceUtil.java | 1 - .../stages/TestIntermediateStateCalcStage.java | 4 +--- .../controller/stages/TestRecoveryLoadBalance.java | 2 +- 6 files changed, 32 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index ad8a9a2..19b713c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.DataUpdater; +import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; @@ -76,6 +77,7 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.RebalanceUtil; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +155,27 @@ public class ZKHelixAdmin implements HelixAdmin { ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); // delete instance path - _zkClient.deleteRecursively(instancePath); + int retryCnt = 0; + while (true) { + try { + _zkClient.deleteRecursively(instancePath); + return; + } catch (HelixException e) { + if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause() + .getCause() instanceof KeeperException.NotEmptyException) { + // Racing condition with controller's persisting node history, retryable. + // We don't need to backoff here as this racing condition only happens once (controller + // does not repeatedly write instance history) + logger.warn("Retrying dropping instance {} with exception {}", + instanceConfig.getInstanceName(), e.getCause().getMessage()); + retryCnt ++; + } else { + logger.error("Failed to drop instance {} (not retryable).", + instanceConfig.getInstanceName(), e.getCause()); + throw e; + } + } + } } @Override diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 334b605..0bd93fe 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -40,7 +40,6 @@ public class JobRebalancer extends TaskRebalancer { long startTime = System.currentTimeMillis(); final String jobName = resource.getResourceName(); LOG.debug("Computer Best Partition for job: " + jobName); - if (_jobDispatcher == null) { _jobDispatcher = new JobDispatcher(); } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index e75fa82..faaeb51 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -47,19 +47,23 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher implements Rebalancer, MappingCalculator { private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class); - @Override public void init(HelixManager manager) { + @Override + public void init(HelixManager manager) { _manager = manager; } - @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + @Override + public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput); - @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { // All of the heavy lifting is in the ResourceAssignment computation, // so this part can just be a no-op. return currentIdealState; } + } diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java index 5bd3bdb..bd843de 100644 --- a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java @@ -213,4 +213,3 @@ public class WeightAwareRebalanceUtil { _stateModelDefs.put(stateModelDefRef, stateModelDefinition); } } - diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java index 83f3f4d..77e8a48 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java @@ -6,7 +6,7 @@ package org.apache.helix.controller.stages; * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance +// * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -54,8 +54,6 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap( resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline")); - - // Initialize bestpossible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java index a95b424..d29ce0e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java @@ -196,4 +196,4 @@ public class TestRecoveryLoadBalance extends BaseStageTest { _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig)); setClusterConfig(_clusterConfig); } -} \ No newline at end of file +}
