Fix issue for keeping sending state transitions We encountered a problem that Helix keep sending state transitions for the cluster already in stable state.
The root cause is periodic rebalance will send event without cloning event object. Two pipeline will share same cache, which may cause conflict. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/53a6791e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/53a6791e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/53a6791e Branch: refs/heads/master Commit: 53a6791e745ff71fb2ebaed03f731b8cb00baa57 Parents: c3297ae Author: Junkai Xue <[email protected]> Authored: Fri Sep 7 14:12:12 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Oct 29 14:29:25 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/controller/GenericHelixController.java | 2 +- .../helix/controller/stages/BestPossibleStateCalcStage.java | 8 ++++++++ .../helix/controller/stages/task/TaskSchedulingStage.java | 9 +++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/53a6791e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 6b1244b..800a331 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -216,7 +216,7 @@ public class GenericHelixController implements IdealStateChangeListener, event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); _taskEventQueue.put(event); - _eventQueue.put(event); + _eventQueue.put(event.clone(uid)); logger.info(String .format("Controller rebalance event triggered with event type: %s for cluster %s", http://git-wip-us.apache.org/repos/asf/helix/blob/53a6791e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 2000bec..636c6e7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -200,6 +200,14 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { idealState.setStateModelDefRef(resource.getStateModelDefRef()); } + // Skip resources are tasks for regular pipeline + if (idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + LogUtil.logWarn(logger, _eventId, String + .format("Resource %s should not be processed by %s pipeline", resourceName, + cache.isTaskCache() ? "TASK" : "DEFAULT")); + return false; + } + Rebalancer rebalancer = getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled()); MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName); http://git-wip-us.apache.org/repos/asf/helix/blob/53a6791e/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index cbb0160..fe52390 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -22,6 +22,7 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobRebalancer; +import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskRebalancer; import org.apache.helix.task.WorkflowContext; @@ -106,6 +107,14 @@ public class TaskSchedulingStage extends AbstractBaseStage { idealState.setStateModelDefRef(resource.getStateModelDefRef()); } + // Skip the resources are not belonging to task pipeline + if (!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + LogUtil.logWarn(logger, _eventId, String + .format("Resource %s should not be processed by %s pipeline", resourceName, + cache.isTaskCache() ? "TASK" : "DEFAULT")); + return false; + } + Rebalancer rebalancer = null; String rebalancerClassName = idealState.getRebalancerClassName(); if (rebalancerClassName != null) {
