Refactor TaskAssignmentCalculator API Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large and not all the contents inside are used.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb2a9db Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb2a9db Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb2a9db Branch: refs/heads/helix-0.6.x Commit: 7bb2a9db2396a00bb9a721634a2432240679c657 Parents: 0a18726 Author: Junkai Xue <j...@linkedin.com> Authored: Tue Sep 13 16:00:08 2016 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Tue Sep 13 16:00:08 2016 -0700 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 36 ++++++++++++++------ .../FixedTargetTaskAssignmentCalculator.java | 35 +++++++------------ .../helix/task/FixedTargetTaskRebalancer.java | 4 +-- .../task/GenericTaskAssignmentCalculator.java | 6 ++-- .../helix/task/GenericTaskRebalancer.java | 4 +-- .../org/apache/helix/task/JobRebalancer.java | 11 +++--- .../helix/task/TaskAssignmentCalculator.java | 10 +++--- 7 files changed, 56 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index dacf98d..c8ca941 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -453,21 +453,35 @@ public class ClusterDataCache { } /** - * Return all the nodes that are enabled and tagged same as the job. - * @param allInstances List of instances to filter with instance tag - * @param instanceTag The instance group tag - * @return A new set contains instance name and that are marked enabled and have same - * tag with job. The original set will not be changed during the filtering + * Return all the live nodes that are enabled + * @return A new set contains live instance name and that are marked enabled */ - public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances, - String instanceTag) { + public Set<String> getAllEnabledLiveInstances() { + return getAllEnabledInstances(null); + } + + /** + * Return all the live nodes that are enabled and tagged same as the job. + * @param instanceTag The instance group tag, could be null, when no instance group specified + * @return A new set contains live instance name and that are marked enabled and have same + * tag with job, only if instance tag input is not null. + */ + public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) { + return getAllEnabledInstances(instanceTag); + } + + private Set<String> getAllEnabledInstances(String instanceTag) { Set<String> enabledTagInstances = new HashSet<String>(); - for (String instance : allInstances) { + for (String instance : _liveInstanceMap.keySet()) { InstanceConfig instanceConfig = _instanceConfigMap.get(instance); - if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig - .containsTag(instanceTag)) { - enabledTagInstances.add(instance); + // Check instance is enabled + if (instanceConfig != null && instanceConfig.getInstanceEnabled()) { + // Check whether it has instance group or not + // If it has instance group, check whether it belongs to that group or not + if (instanceTag == null || instanceConfig.containsTag(instanceTag)) { + enabledTagInstances.add(instance); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java index 09db616..0768b51 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -48,36 +48,37 @@ import org.apache.log4j.Logger; public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator { private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class); - @Override - public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { - return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx); + @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map<String, IdealState> idealStateMap) { + return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), jobCfg, jobCtx); } @Override public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set<Integer> partitionSet, ClusterDataCache cache) { - IdealState tgtIs = getTgtIdealState(jobCfg, cache); + Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) { + IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap); if (tgtIs == null) { LOG.warn("Missing target resource for the scheduled job!"); return Collections.emptyMap(); } Set<String> tgtStates = jobCfg.getTargetPartitionStates(); return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, - jobContext, cache); + jobContext); } /** * Gets the ideal state of the target resource of this job * @param jobCfg job config containing target resource id - * @param cache snapshot of the cluster containing the task and target resource + * @param idealStateMap the map of resource name map to ideal state * @return target resource ideal state, or null */ - private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) { + private static IdealState getTgtIdealState(JobConfig jobCfg, + Map<String, IdealState> idealStateMap) { String tgtResourceId = jobCfg.getTargetResource(); - return cache.getIdealState(tgtResourceId); + return idealStateMap.get(tgtResourceId); } /** @@ -131,7 +132,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato */ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment( CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs, - Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) { + Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) { Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); for (String instance : instances) { result.put(instance, new TreeSet<Integer>()); @@ -153,18 +154,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato continue; } - InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance); - - if (instanceConfig == null) { - LOG.error("Instance config not found for instance : " + instance); - continue; - } - - if (!instanceConfig.getInstanceEnabled()) { - LOG.debug("Instance has been disabled, ignore instance : " + instance); - continue; - } - String s = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance); http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index 569fe03..1589c1a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -43,7 +43,7 @@ import org.apache.helix.model.ResourceAssignment; @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { return taskAssignmentCalculator - .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache); + .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates()); } @Override public Map<String, SortedSet<Integer>> getTaskAssignment( @@ -53,6 +53,6 @@ import org.apache.helix.model.ResourceAssignment; ClusterDataCache cache) { return taskAssignmentCalculator .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext, - workflowCfg, workflowCtx, partitionSet, cache); + workflowCfg, workflowCtx, partitionSet, cache.getIdealStates()); } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index fbc7af3..58ba670 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -34,6 +34,7 @@ import java.util.TreeSet; import org.apache.helix.HelixException; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.util.JenkinsHash; @@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map<String, IdealState> idealStateMap) { Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap(); Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap(); for (TaskConfig taskCfg : taskMap.values()) { @@ -69,7 +71,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set<Integer> partitionSet, ClusterDataCache cache) { + Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) { // Gather input to the full auto rebalancing algorithm LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); states.put("ONLINE", 1); http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java index 6a005b9..1720fbb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java @@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer { public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { return taskAssignmentCalculator - .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache); + .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates()); } @Override @@ -52,6 +52,6 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer { Set<Integer> partitionSet, ClusterDataCache cache) { return taskAssignmentCalculator .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext, - workflowCfg, workflowCtx, partitionSet, cache); + workflowCfg, workflowCtx, partitionSet, cache.getIdealStates()); } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 378ad95..cf7f5e6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -131,9 +131,8 @@ public class JobRebalancer extends TaskRebalancer { // Fetch the previous resource assignment from the property store. This is required because of // HELIX-230. Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null - ? clusterData.getLiveInstances().keySet() - : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(), - jobCfg.getInstanceGroupTag()); + ? clusterData.getAllEnabledLiveInstances() + : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); if (liveInstances.isEmpty()) { LOG.error("No available instance found for job!"); @@ -222,8 +221,8 @@ public class JobRebalancer extends TaskRebalancer { // Process all the current assignments of tasks. TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); - Set<Integer> allPartitions = - taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); + Set<Integer> allPartitions = taskAssignmentCal + .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates()); if (allPartitions == null || allPartitions.isEmpty()) { // Empty target partitions, mark the job as FAILED. @@ -424,7 +423,7 @@ public class JobRebalancer extends TaskRebalancer { // Get instance->[partition, ...] mappings for the target resource. Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, - workflowConfig, workflowCtx, allPartitions, cache); + workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { String instance = entry.getKey(); if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java index a3ed5ab..a6a9ed3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java @@ -2,6 +2,7 @@ package org.apache.helix.task; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceAssignment; import java.util.Collection; @@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator { * @param jobCtx the task context * @param workflowCfg the workflow configuration * @param workflowCtx the workflow context - * @param cache cluster snapshot + * @param idealStateMap the map of resource name map to ideal state * @return set of partition numbers */ public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache); + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map<String, IdealState> idealStateMap); /** * Compute an assignment of tasks to instances @@ -34,12 +36,12 @@ public abstract class TaskAssignmentCalculator { * @param workflowCfg the workflow configuration * @param workflowCtx the workflow context * @param partitionSet the partitions to assign - * @param cache cluster snapshot + * @param idealStateMap the map of resource name map to ideal state * @return map of instances to set of partition numbers */ public abstract Map<String, SortedSet<Integer>> getTaskAssignment( CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, - ClusterDataCache cache); + Map<String, IdealState> idealStateMap); }