[HELIX-669] Refactor the cancellation exception handling logic Refactor the cancellation exception handling logic
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6775cd3f Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6775cd3f Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6775cd3f Branch: refs/heads/master Commit: 6775cd3ffeebc9664cb042147015621500a32137 Parents: 804ff7c Author: Junkai Xue <j...@linkedin.com> Authored: Thu Feb 16 18:53:35 2017 -0800 Committer: Junkai Xue <j...@linkedin.com> Committed: Tue Oct 3 14:42:14 2017 -0700 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 2 +- .../handling/HelixStateTransitionHandler.java | 26 ++++++++++++++------ .../helix/messaging/handling/HelixTask.java | 8 ------ .../TestStateTransitionCancellation.java | 2 ++ 4 files changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/6775cd3f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 93555cd..1cd0b2c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -58,7 +58,7 @@ import com.google.common.collect.Sets; public class ClusterDataCache { private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; - private ClusterConfig _clusterConfig; + ClusterConfig _clusterConfig; Map<String, LiveInstance> _liveInstanceMap; Map<String, LiveInstance> _liveInstanceCacheMap; Map<String, IdealState> _idealStateMap; http://git-wip-us.apache.org/repos/asf/helix/blob/6775cd3f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index fa9def6..e07ca90 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -202,6 +202,9 @@ public class HelixStateTransitionHandler extends MessageHandler { // if the partition is not to be dropped, update _stateModel to the TO_STATE _stateModel.updateState(toState); } + } else if (taskResult.isCancelled()) { + // Cancelled message does not need current state update + return; } else { if (exception instanceof HelixStateMismatchException) { // if fromState mismatch, set current state on zk to stateModel's current state @@ -304,15 +307,22 @@ public class HelixStateTransitionHandler extends MessageHandler { e = (InterruptedException) e.getCause(); } - if (e.getCause() != null && e.getCause() instanceof HelixRollbackException) { - throw new HelixRollbackException(e.getCause()); + if (e instanceof HelixRollbackException || (e.getCause() != null + && e.getCause() instanceof HelixRollbackException)) { + // TODO : Support cancel to any state + logger.info( + "Rollback happened of state transition on resource \"" + _message.getResourceName() + + "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message + .getFromState() + "\" to \"" + _message.getToState() + "\""); + taskResult.setCancelled(true); + } else { + _statusUpdateUtil + .logError(message, HelixStateTransitionHandler.class, e, errorMessage, accessor); + taskResult.setSuccess(false); + taskResult.setMessage(e.toString()); + taskResult.setException(e); + taskResult.setInterrupted(e instanceof InterruptedException); } - _statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e, errorMessage, - accessor); - taskResult.setSuccess(false); - taskResult.setMessage(e.toString()); - taskResult.setException(e); - taskResult.setInterrupted(e instanceof InterruptedException); } // add task result to context for postHandling http://git-wip-us.apache.org/repos/asf/helix/blob/6775cd3f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 05e9b89..a13e95d 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -95,14 +95,6 @@ public class HelixTask implements MessageTask { _statusUpdateUtil.logError(_message, HelixTask.class, e, "State transition interrupted, timeout:" + _isTimeout, accessor); logger.info("Message " + _message.getMsgId() + " is interrupted"); - } catch (HelixRollbackException e) { - // TODO : Support cancel to any state - logger.info( - "Rollback happened of state transition on resource \"" + _message.getResourceName() - + "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message - .getFromState() + "\" to \"" + _message.getToState() + "\""); - taskResult = new HelixTaskResult(); - taskResult.setCancelled(true); } catch (Exception e) { taskResult = new HelixTaskResult(); taskResult.setException(e); http://git-wip-us.apache.org/repos/asf/helix/blob/6775cd3f/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java index f001438..951b6d2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java @@ -59,6 +59,7 @@ public class TestStateTransitionCancellation extends TaskTestBase { _numParitions = 20; _numNodes = 2; _numReplicas = 2; + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursive(namespace); @@ -94,6 +95,7 @@ public class TestStateTransitionCancellation extends TaskTestBase { _setupTool.getClusterManagementTool() .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false); + // Wait for pipeline reaching final stage Thread.sleep(2000L); ExternalView externalView = _setupTool.getClusterManagementTool()