Repository: helix Updated Branches: refs/heads/helix-0.6.x 4ff7e3888 -> 45c17f425
[HELIX-438] Improve task framework retry logic Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/45c17f42 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/45c17f42 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/45c17f42 Branch: refs/heads/helix-0.6.x Commit: 45c17f4251f71f5422b0292cf5e539380a9308e4 Parents: 4ff7e38 Author: Kanak Biscuitwala <ka...@apache.org> Authored: Fri May 23 14:22:48 2014 -0700 Committer: Kanak Biscuitwala <ka...@apache.org> Committed: Tue May 27 13:58:16 2014 -0700 ---------------------------------------------------------------------- .../helix/task/FixedTargetTaskRebalancer.java | 13 +-- .../helix/task/GenericTaskRebalancer.java | 91 ++++++++++++++++++-- .../java/org/apache/helix/task/JobConfig.java | 28 +++++- .../org/apache/helix/task/TaskRebalancer.java | 8 +- .../java/org/apache/helix/task/TaskRunner.java | 14 ++- .../java/org/apache/helix/task/Workflow.java | 47 +++++----- .../org/apache/helix/task/beans/JobBean.java | 3 +- .../task/TestIndependentTaskRebalancer.java | 89 ++++++++++++++----- 8 files changed, 231 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index dc6fbaa..4c4fee1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -19,6 +19,7 @@ package org.apache.helix.task; * under the License. */ +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,7 +52,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { @Override public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, - ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg, + ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, ClusterDataCache cache) { IdealState tgtIs = getTgtIdealState(jobCfg, cache); @@ -59,7 +60,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { return Collections.emptyMap(); } Set<String> tgtStates = jobCfg.getTargetPartitionStates(); - return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet, + return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, jobContext); } @@ -114,7 +115,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { /** * Get partition assignments for the target resource, but only for the partitions of interest. * @param currStateOutput The current state of the instances in the cluster. - * @param instanceList The set of instances. + * @param instances The instances. * @param tgtIs The ideal state of the target resource. * @param tgtStates Only partitions in this set of states will be considered. If null, partitions * do not need to @@ -123,10 +124,10 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { * @return A map of instance vs set of partition ids assigned to that instance. */ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment( - CurrentStateOutput currStateOutput, Iterable<String> instanceList, IdealState tgtIs, + CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs, Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) { Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); - for (String instance : instanceList) { + for (String instance : instances) { result.put(instance, new TreeSet<Integer>()); } @@ -138,7 +139,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { } int pId = partitions.get(0); if (includeSet.contains(pId)) { - for (String instance : instanceList) { + for (String instance : instances) { String s = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance); http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java index 9174eb1..8903ae2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -37,6 +38,8 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import com.google.common.base.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -46,6 +49,9 @@ import com.google.common.collect.Sets; * assignment to target partitions and states of another resource */ public class GenericTaskRebalancer extends TaskRebalancer { + /** Reassignment policy for this algorithm */ + private RetryPolicy _retryPolicy = new DefaultRetryReassigner(); + @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { @@ -63,7 +69,7 @@ public class GenericTaskRebalancer extends TaskRebalancer { @Override public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, - ResourceAssignment prevAssignment, Iterable<String> instanceList, JobConfig jobCfg, + ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, ClusterDataCache cache) { // Gather input to the full auto rebalancing algorithm @@ -115,7 +121,7 @@ public class GenericTaskRebalancer extends TaskRebalancer { new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, new AutoRebalanceStrategy.DefaultPlacementScheme()); List<String> allNodes = - Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache)); + Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes); Map<String, List<String>> preferenceLists = record.getListFields(); @@ -133,6 +139,9 @@ public class GenericTaskRebalancer extends TaskRebalancer { taskAssignment.get(participantName).add(Integer.valueOf(partitionName)); } } + + // Finally, adjust the assignment if tasks have been failing + taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment); return taskAssignment; } @@ -140,14 +149,14 @@ public class GenericTaskRebalancer extends TaskRebalancer { * Filter a list of instances based on targeted resource policies * @param jobCfg the job configuration * @param currStateOutput the current state of all instances in the cluster - * @param instanceList valid instances + * @param instances valid instances * @param cache current snapshot of the cluster * @return a set of instances that can be assigned to */ private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput, - Iterable<String> instanceList, ClusterDataCache cache) { + Iterable<String> instances, ClusterDataCache cache) { // No target resource means any instance is available - Set<String> allInstances = Sets.newHashSet(instanceList); + Set<String> allInstances = Sets.newHashSet(instances); String targetResource = jobCfg.getTargetResource(); if (targetResource == null) { return allInstances; @@ -183,4 +192,76 @@ public class GenericTaskRebalancer extends TaskRebalancer { allInstances.retainAll(eligibleInstances); return allInstances; } + + public interface RetryPolicy { + /** + * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently + * assigned + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can serve tasks + * @param origAssignment the unmodified assignment + * @return the adjusted assignment + */ + Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment); + } + + private static class DefaultRetryReassigner implements RetryPolicy { + @Override + public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) { + // Compute an increasing integer ID for each instance + BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size()); + int instanceIndex = 0; + for (String instance : instances) { + instanceMap.put(instance, instanceIndex++); + } + + // Move partitions + Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap(); + for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) { + String instance = e.getKey(); + SortedSet<Integer> partitions = e.getValue(); + Integer instanceId = instanceMap.get(instance); + if (instanceId != null) { + for (int p : partitions) { + // Determine for each partition if there have been failures with the current assignment + // strategy, and if so, force a shift in assignment for that partition only + int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p); + int newInstanceId = (instanceId + shiftValue) % instances.size(); + String newInstance = instanceMap.inverse().get(newInstanceId); + if (newInstance == null) { + newInstance = instance; + } + if (!newAssignment.containsKey(newInstance)) { + newAssignment.put(newInstance, new TreeSet<Integer>()); + } + newAssignment.get(newInstance).add(p); + } + } else { + // In case something goes wrong, just keep the previous assignment + newAssignment.put(instance, partitions); + } + } + return newAssignment; + } + + /** + * In case tasks fail, we may not want to schedule them in the same place. This method allows us + * to compute a shifting value so that we can systematically choose other instances to try + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can be chosen + * @param p the partition to look up + * @return the shifting value + */ + private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, int p) { + int numAttempts = jobCtx.getPartitionNumAttempts(p); + int maxNumAttempts = jobCfg.getMaxAttemptsPerTask(); + int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1); + return numAttempts / (maxNumAttempts / numInstances); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index b166da1..3f9ab41 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -61,6 +61,8 @@ public class JobConfig { public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition"; /** The maximum number of times the task rebalancer may attempt to execute a task. */ public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask"; + /** The maximum number of times Helix will intentionally move a failing task */ + public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask"; /** The number of concurrent tasks that are allowed to run on an instance. */ public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance"; /** The number of tasks within the job that are allowed to fail. */ @@ -75,6 +77,7 @@ public class JobConfig { public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1; public static final int DEFAULT_FAILURE_THRESHOLD = 0; + public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0; private final String _workflow; private final String _targetResource; @@ -85,13 +88,14 @@ public class JobConfig { private final long _timeoutPerTask; private final int _numConcurrentTasksPerInstance; private final int _maxAttemptsPerTask; + private final int _maxForcedReassignmentsPerTask; private final int _failureThreshold; private final Map<String, TaskConfig> _taskConfigMap; private JobConfig(String workflow, String targetResource, List<String> targetPartitions, Set<String> targetPartitionStates, String command, Map<String, String> jobConfigMap, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, - int failureThreshold, Map<String, TaskConfig> taskConfigMap) { + int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig> taskConfigMap) { _workflow = workflow; _targetResource = targetResource; _targetPartitions = targetPartitions; @@ -101,6 +105,7 @@ public class JobConfig { _timeoutPerTask = timeoutPerTask; _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance; _maxAttemptsPerTask = maxAttemptsPerTask; + _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask; _failureThreshold = failureThreshold; if (taskConfigMap != null) { _taskConfigMap = taskConfigMap; @@ -145,6 +150,10 @@ public class JobConfig { return _maxAttemptsPerTask; } + public int getMaxForcedReassignmentsPerTask() { + return _maxForcedReassignmentsPerTask; + } + public int getFailureThreshold() { return _failureThreshold; } @@ -180,6 +189,7 @@ public class JobConfig { } cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask); cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask); + cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask); cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold); return cfgMap; } @@ -198,6 +208,7 @@ public class JobConfig { private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK; private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; + private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD; public JobConfig build() { @@ -205,7 +216,7 @@ public class JobConfig { return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, - _maxAttemptsPerTask, _failureThreshold, _taskConfigMap); + _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap); } /** @@ -246,6 +257,10 @@ public class JobConfig { if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) { b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK))); } + if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) { + b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg + .get(MAX_FORCED_REASSIGNMENTS_PER_TASK))); + } if (cfg.containsKey(FAILURE_THRESHOLD)) { b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD))); } @@ -297,6 +312,11 @@ public class JobConfig { return this; } + public Builder setMaxForcedReassignmentsPerTask(int v) { + _maxForcedReassignmentsPerTask = v; + return this; + } + public Builder setFailureThreshold(int v) { _failureThreshold = v; return this; @@ -340,6 +360,10 @@ public class JobConfig { throw new IllegalArgumentException(String.format("%s has invalid value %s", MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask)); } + if (_maxForcedReassignmentsPerTask < 0) { + throw new IllegalArgumentException(String.format("%s has invalid value %s", + MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask)); + } if (_failureThreshold < 0) { throw new IllegalArgumentException(String.format("%s has invalid value %s", FAILURE_THRESHOLD, _failureThreshold)); http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 849f339..a6244c8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -71,7 +72,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { * Compute an assignment of tasks to instances * @param currStateOutput the current state of the instances * @param prevAssignment the previous task partition assignment - * @param instanceList the instances + * @param instances the instances * @param jobCfg the task configuration * @param taskCtx the task context * @param workflowCfg the workflow configuration @@ -82,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { */ public abstract Map<String, SortedSet<Integer>> getTaskAssignment( CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, - Iterable<String> instanceList, JobConfig jobCfg, JobContext jobContext, + Collection<String> instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, ClusterDataCache cache); @@ -181,7 +182,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { private ResourceAssignment computeResourceMapping(String jobResource, WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment, - Iterable<String> liveInstances, CurrentStateOutput currStateOutput, + Collection<String> liveInstances, CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) { TargetState jobTgtState = workflowConfig.getTargetState(); @@ -364,6 +365,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // This includes all completed, failed, already assigned partitions. Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); addCompletedPartitions(excludeSet, jobCtx, allPartitions); + excludeSet.addAll(skippedPartitions); // Get instance->[partition, ...] mappings for the target resource. Map<String, SortedSet<Integer>> tgtPartitionAssignments = getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index dea383b..7941acb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import org.apache.helix.HelixManager; +import org.apache.helix.task.TaskResult.Status; import org.apache.log4j.Logger; /** @@ -61,7 +62,12 @@ public class TaskRunner implements Runnable { public void run() { try { signalStarted(); - _result = _task.run(); + try { + _result = _task.run(); + } catch (Throwable t) { + LOG.error("Problem running the task", t); + _result = new TaskResult(Status.ERROR, null); + } switch (_result.getStatus()) { case COMPLETED: @@ -93,8 +99,10 @@ public class TaskRunner implements Runnable { * Signals the task to cancel itself. */ public void timeout() { - _timeout = true; - cancel(); + if (!_done) { + _timeout = true; + cancel(); + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 537f287..57404d8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -164,7 +164,9 @@ public class Workflow { Joiner.on(",").join(job.targetPartitions)); } builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK, - String.valueOf(job.maxAttemptsPerPartition)); + String.valueOf(job.maxAttemptsPerTask)); + builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, + String.valueOf(job.maxForcedReassignmentsPerTask)); builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(job.numConcurrentTasksPerInstance)); builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK, @@ -227,40 +229,41 @@ public class Workflow { _expiry = -1; } - public Builder addConfig(String node, String key, String val) { - node = namespacify(node); - _dag.addNode(node); - if (!_jobConfigs.containsKey(node)) { - _jobConfigs.put(node, new TreeMap<String, String>()); + public Builder addConfig(String job, String key, String val) { + job = namespacify(job); + _dag.addNode(job); + if (!_jobConfigs.containsKey(job)) { + _jobConfigs.put(job, new TreeMap<String, String>()); } - _jobConfigs.get(node).put(key, val); + _jobConfigs.get(job).put(key, val); return this; } - public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) { - return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap)); + public Builder addJobConfigMap(String job, Map<String, String> jobConfigMap) { + return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap)); } - public Builder addJobConfig(String node, JobConfig jobConfig) { + public Builder addJobConfig(String job, JobConfig jobConfig) { for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) { String key = e.getKey(); String val = e.getValue(); - addConfig(node, key, val); + addConfig(job, key, val); } - addTaskConfigs(node, jobConfig.getTaskConfigMap().values()); + jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name); + addTaskConfigs(job, jobConfig.getTaskConfigMap().values()); return this; } - public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) { - node = namespacify(node); - _dag.addNode(node); - if (!_taskConfigs.containsKey(node)) { - _taskConfigs.put(node, new ArrayList<TaskConfig>()); + public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) { + job = namespacify(job); + _dag.addNode(job); + if (!_taskConfigs.containsKey(job)) { + _taskConfigs.put(job, new ArrayList<TaskConfig>()); } - if (!_jobConfigs.containsKey(node)) { - _jobConfigs.put(node, new TreeMap<String, String>()); + if (!_jobConfigs.containsKey(job)) { + _jobConfigs.put(job, new TreeMap<String, String>()); } - _taskConfigs.get(node).addAll(taskConfigs); + _taskConfigs.get(job).addAll(taskConfigs); return this; } @@ -277,8 +280,8 @@ public class Workflow { return this; } - public String namespacify(String task) { - return TaskUtil.getNamespacedJobName(_name, task); + public String namespacify(String job) { + return TaskUtil.getNamespacedJobName(_name, job); } public Workflow build() { http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index af5882c..bc5350a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -38,6 +38,7 @@ public class JobBean { public List<TaskBean> tasks; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; - public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; + public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; + public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD; } http://git-wip-us.apache.org/repos/asf/helix/blob/45c17f42/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 7041db8..10f0ac7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -62,6 +62,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { private final MockParticipantManager[] _participants = new MockParticipantManager[n]; private ClusterControllerManager _controller; private Set<String> _invokedClasses = Sets.newHashSet(); + private Map<String, Integer> _runCounts = Maps.newHashMap(); private HelixManager _manager; private TaskDriver _driver; @@ -81,24 +82,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); } - // Set task callbacks - Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); - taskFactoryReg.put("TaskOne", new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new TaskOne(context); - } - }); - taskFactoryReg.put("TaskTwo", new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new TaskTwo(context); - } - }); - // start dummy participants for (int i = 0; i < n; i++) { - String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + + // Set task callbacks + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("TaskOne", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskOne(context, instanceName); + } + }); + taskFactoryReg.put("TaskTwo", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskTwo(context, instanceName); + } + }); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); // Register a Task state model factory. @@ -124,6 +126,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { @BeforeMethod public void beforeMethod() { _invokedClasses.clear(); + _runCounts.clear(); } @Test @@ -207,10 +210,46 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } + @Test + public void testReassignment() throws Exception { + final int NUM_INSTANCES = 2; + String jobName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); + List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2); + Map<String, String> taskConfigMap = + Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + + START_PORT)); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); + taskConfigs.add(taskConfig1); + workflowBuilder.addTaskConfigs(jobName, taskConfigs); + workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); + workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance + Map<String, String> jobConfigMap = Maps.newHashMap(); + jobConfigMap.put("Timeout", "1000"); + workflowBuilder.addJobConfigMap(jobName, jobConfigMap); + _driver.start(workflowBuilder.build()); + + // Ensure the job completes + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + + // Ensure that the class was invoked + Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); + + // Ensure that this was tried on two different instances, the first of which exhausted the + // attempts number, and the other passes on the first try + Assert.assertEquals(_runCounts.size(), NUM_INSTANCES); + Assert.assertTrue(_runCounts.values().contains( + JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES)); + Assert.assertTrue(_runCounts.values().contains(1)); + } + private class TaskOne extends ReindexTask { private final boolean _shouldFail; + private final String _instanceName; - public TaskOne(TaskCallbackContext context) { + public TaskOne(TaskCallbackContext context, String instanceName) { super(context); // Check whether or not this task should succeed @@ -220,15 +259,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Map<String, String> configMap = taskConfig.getConfigMap(); if (configMap != null && configMap.containsKey("fail") && Boolean.parseBoolean(configMap.get("fail"))) { - shouldFail = true; + // if a specific instance is specified, only fail for that one + shouldFail = + !configMap.containsKey("failInstance") + || configMap.get("failInstance").equals(instanceName); } } _shouldFail = shouldFail; + + // Initialize the count for this instance if not already done + if (!_runCounts.containsKey(instanceName)) { + _runCounts.put(instanceName, 0); + } + _instanceName = instanceName; } @Override public TaskResult run() { _invokedClasses.add(getClass().getName()); + _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1); // Fail the task if it should fail if (_shouldFail) { @@ -240,8 +289,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { } private class TaskTwo extends TaskOne { - public TaskTwo(TaskCallbackContext context) { - super(context); + public TaskTwo(TaskCallbackContext context, String instanceName) { + super(context, instanceName); } } }