This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit dad3f36a1b8aeb7590214069141915a8bf932ec6 Author: Junkai Xue <[email protected]> AuthorDate: Mon Nov 5 16:55:22 2018 -0800 Integrate JobIterator for existing pipeline Existing pipeline still looping all the jobs inside the workflow. When JobIterator is ready, we shall change the looping mechanism to apply the JobIterator and skip the jobs when quota is used out. --- .../apache/helix/common/caches/TaskDataCache.java | 50 ++++++++++++++- .../helix/controller/stages/ClusterDataCache.java | 6 ++ .../stages/task/TaskSchedulingStage.java | 5 +- .../apache/helix/task/AbstractTaskDispatcher.java | 26 ++++++-- .../main/java/org/apache/helix/task/JobDag.java | 20 +++++- .../java/org/apache/helix/task/JobDispatcher.java | 7 +- .../java/org/apache/helix/task/JobRebalancer.java | 22 +------ .../java/org/apache/helix/task/RuntimeJobDag.java | 44 +++++++++++-- .../org/apache/helix/task/WorkflowDispatcher.java | 36 +++++++++-- .../org/apache/helix/task/WorkflowRebalancer.java | 3 +- .../TestStateTransitionTimeoutWithResource.java | 2 +- .../TestAutoRebalancePartitionLimit.java | 2 +- .../helix/integration/task/TaskTestUtil.java | 74 ++++++++++++---------- .../integration/task/TestScheduleDelayTask.java | 18 +++--- .../task/TestTaskRebalancerParallel.java | 9 ++- .../integration/task/TestWorkflowTermination.java | 2 +- .../apache/helix/task/TestCleanExpiredJobs.java | 6 +- .../apache/helix/task/TestScheduleDelayJobs.java | 6 +- 18 files changed, 236 insertions(+), 102 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java index 95ee62d..afe552e 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -36,6 +36,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; +import org.apache.helix.task.RuntimeJobDag; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; @@ -51,6 +52,7 @@ public class TaskDataCache extends AbstractDataCache { private String _clusterName; private Map<String, JobConfig> _jobConfigMap = new HashMap<>(); + private Map<String, RuntimeJobDag> _runtimeJobDagMap = new HashMap<>(); private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>(); private Map<String, ZNRecord> _contextMap = new HashMap<>(); private Set<String> _contextToUpdate = new HashSet<>(); @@ -83,18 +85,53 @@ public class TaskDataCache extends AbstractDataCache { refreshJobContexts(accessor); // update workflow and job configs. _workflowConfigMap.clear(); - _jobConfigMap.clear(); + Map<String, JobConfig> newJobConfigs = new HashMap<>(); + Set<String> workflowsUpdated = new HashSet<>(); for (Map.Entry<String, ResourceConfig> entry : resourceConfigMap.entrySet()) { if (entry.getValue().getRecord().getSimpleFields() .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) { _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue())); + if (!_runtimeJobDagMap.containsKey(entry.getKey())) { + WorkflowConfig workflowConfig = _workflowConfigMap.get(entry.getKey()); + _runtimeJobDagMap.put(entry.getKey(), new RuntimeJobDag(workflowConfig.getJobDag(), + workflowConfig.isJobQueue() || !workflowConfig.isTerminable(), + workflowConfig.getParallelJobs())); + } } else if (entry.getValue().getRecord().getSimpleFields() .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) { - _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue())); + newJobConfigs.put(entry.getKey(), new JobConfig(entry.getValue())); + } + } + + // The following 3 blocks is for finding a list of workflows whose JobDAGs have been changed + // because their RuntimeJobDags would need to be re-built + // newly added jobs + for (String jobName : newJobConfigs.keySet()) { + if (!_jobConfigMap.containsKey(jobName) && newJobConfigs.get(jobName).getWorkflow() != null) { + workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow()); } } - _dispatchedJobs.clear(); + // Removed jobs + for (String jobName : _jobConfigMap.keySet()) { + if (!newJobConfigs.containsKey(jobName) && _jobConfigMap.get(jobName).getWorkflow() != null) { + workflowsUpdated.add(_jobConfigMap.get(jobName).getWorkflow()); + } + } + + // Combine all the workflows' job dag which need update + for (String changedWorkflow : workflowsUpdated) { + if (_workflowConfigMap.containsKey(changedWorkflow)) { + WorkflowConfig workflowConfig = _workflowConfigMap.get(changedWorkflow); + _runtimeJobDagMap.put(changedWorkflow, new RuntimeJobDag(workflowConfig.getJobDag(), + workflowConfig.isJobQueue() || !workflowConfig.isTerminable(), + workflowConfig.getParallelJobs())); + } + } + + _dispatchedJobs.clear(); + _runtimeJobDagMap.keySet().retainAll(_workflowConfigMap.keySet()); + _jobConfigMap = newJobConfigs; return true; } @@ -319,4 +356,11 @@ public class TaskDataCache extends AbstractDataCache { public Set<String> getDispatchedJobs() { return _dispatchedJobs; } + + public RuntimeJobDag getRuntimeJobDag(String workflowName) { + if (_runtimeJobDagMap.containsKey(workflowName)) { + return _runtimeJobDagMap.get(workflowName); + } + return null; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 571267e..c240e23 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -58,6 +58,8 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobDag; +import org.apache.helix.task.RuntimeJobDag; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.WorkflowConfig; @@ -1035,4 +1037,8 @@ public class ClusterDataCache extends AbstractDataCache { public Set<String> getDispatchedJobs() { return _taskDataCache.getDispatchedJobs(); } + + public RuntimeJobDag getRuntimeJobDag(String workflowName) { + return _taskDataCache.getRuntimeJobDag(workflowName); + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index 218f470..5614e98 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -65,12 +65,12 @@ public class TaskSchedulingStage extends AbstractBaseStage { final BestPossibleStateOutput bestPossibleStateOutput = compute(event, resourceMap, currentStateOutput); event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); - } private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, CurrentStateOutput currentStateOutput) { ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + // After compute all workflows and jobs, there are still task resources need to be DROPPED Map<String, Resource> restOfResources = new HashMap<>(resourceMap); BestPossibleStateOutput output = new BestPossibleStateOutput(); final List<String> failureResources = new ArrayList<>(); @@ -253,7 +253,8 @@ public class TaskSchedulingStage extends AbstractBaseStage { WorkflowContext context = _workflowDispatcher .getOrInitializeWorkflowContext(workflowId, cache.getTaskDataCache()); _workflowDispatcher - .updateWorkflowStatus(workflowId, cache.getWorkflowConfig(workflowId), context); + .updateWorkflowStatus(workflowId, cache.getWorkflowConfig(workflowId), context, + currentStateOutput, bestPossibleOutput); _workflowDispatcher .assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), context, currentStateOutput, bestPossibleOutput, resourceMap); diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index f340d71..b2f1120 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -424,7 +424,7 @@ public abstract class AbstractTaskDispatcher { WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) { markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap, - clusterDataCache.getTaskDataCache()); + clusterDataCache); // Mark all INIT task to TASK_ABORTED for (int pId : jobContext.getPartitionSet()) { if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) { @@ -761,10 +761,11 @@ public abstract class AbstractTaskDispatcher { protected void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) { + finishJobInRuntimeJobDag(clusterDataCache, workflowConfig.getWorkflowId(), jobName); long currentTime = System.currentTimeMillis(); workflowContext.setJobState(jobName, TaskState.COMPLETED); jobContext.setFinishTime(currentTime); - if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache.getTaskDataCache())) { + if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache)) { workflowContext.setFinishTime(currentTime); updateWorkflowMonitor(workflowContext, workflowConfig); } @@ -773,7 +774,8 @@ public abstract class AbstractTaskDispatcher { protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap, - TaskDataCache clusterDataCache) { + ClusterDataCache clusterDataCache) { + finishJobInRuntimeJobDag(clusterDataCache, workflowConfig.getWorkflowId(), jobName); long currentTime = System.currentTimeMillis(); workflowContext.setJobState(jobName, TaskState.FAILED); if (jobContext != null) { @@ -811,7 +813,7 @@ public abstract class AbstractTaskDispatcher { * returns false otherwise. */ protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg, - Map<String, JobConfig> jobConfigMap, TaskDataCache clusterDataCache) { + Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) { boolean incomplete = false; TaskState workflowState = ctx.getWorkflowState(); @@ -1019,7 +1021,7 @@ public abstract class AbstractTaskDispatcher { */ protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap, - TaskDataCache clusterDataCache) { + ClusterDataCache clusterDataCache) { int notStartedCount = 0; int failedOrTimeoutCount = 0; int incompleteParentCount = 0; @@ -1104,4 +1106,18 @@ public abstract class AbstractTaskDispatcher { output.setState(resource, partition, newStateMap); } } + + protected void finishJobInRuntimeJobDag(ClusterDataCache clusterDataCache, String workflowName, + String jobName) { + RuntimeJobDag runtimeJobDag = clusterDataCache.getRuntimeJobDag(workflowName); + if (runtimeJobDag != null) { + runtimeJobDag.finishJob(jobName); + LOG.debug( + String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName)); + } else { + LOG.warn(String + .format("Failed to find runtime job DAG for workflow %s and job %s", workflowName, + jobName)); + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java index 1816b8e..9a7c9e3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java @@ -21,12 +21,15 @@ package org.apache.helix.task; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.HashSet; import java.util.TreeMap; import java.util.TreeSet; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.codehaus.jackson.annotate.JsonProperty; @@ -46,12 +49,14 @@ public class JobDag { protected Map<String, Set<String>> _childrenToParents; @JsonProperty("allNodes") - private Set<String> _allNodes; + protected Set<String> _allNodes; protected Set<String> _independentNodes; // Un-parented nodes are stored to avoid repeated calculation // unless there is a DAG modification public static final JobDag EMPTY_DAG = new JobDag(); + protected Iterator<String> _jobIterator; + /** * Constructor for Job DAG. */ @@ -263,4 +268,15 @@ public class JobDag { } } } -} \ No newline at end of file + + @JsonIgnore + public String getNextJob() { + if (_jobIterator == null) { + _jobIterator = _allNodes.iterator(); + } + if (_jobIterator.hasNext()) { + return _jobIterator.next(); + } + return null; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 20ce23f..9d602a4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -35,7 +35,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { _clusterDataCache = cache; } - public ResourceAssignment processJobStatusUpdateandAssignment(String jobName, + public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName, CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) { // Fetch job configuration JobConfig jobCfg = _clusterDataCache.getJobConfig(jobName); @@ -52,7 +52,6 @@ public class JobDispatcher extends AbstractTaskDispatcher { return buildEmptyAssignment(jobName, currStateOutput); } - if (workflowCtx == null) { LOG.error("Workflow context is NULL for " + jobName); return buildEmptyAssignment(jobName, currStateOutput); @@ -75,6 +74,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { LOG.info(String.format( "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.", workflowResource, jobName, workflowState, jobState)); + finishJobInRuntimeJobDag(_clusterDataCache, workflowResource, jobName); TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); _rebalanceScheduler.removeScheduledRebalance(jobName); return buildEmptyAssignment(jobName, currStateOutput); @@ -87,7 +87,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx), - _clusterDataCache.getJobConfigMap(), _clusterDataCache.getTaskDataCache())) { + _clusterDataCache.getJobConfigMap(), _clusterDataCache)) { LOG.info("Job is not ready to run " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } @@ -267,6 +267,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { // can be dropped(note that Helix doesn't track whether the drop is success or not). if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) { handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg); + finishJobInRuntimeJobDag(cache, workflowConfig.getWorkflowId(), jobResource); return buildEmptyAssignment(jobResource, currStateOutput); } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 1bb3ff4..334b605 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -19,34 +19,14 @@ package org.apache.helix.task; * under the License. */ -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.helix.AccessOption; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; -import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; - /** * Custom rebalancer implementation for the {@code Job} in task model. */ @@ -68,7 +48,7 @@ public class JobRebalancer extends TaskRebalancer { _jobDispatcher.updateCache(clusterData); _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor); ResourceAssignment resourceAssignment = _jobDispatcher - .processJobStatusUpdateandAssignment(jobName, currStateOutput, + .processJobStatusUpdateAndAssignment(jobName, currStateOutput, clusterData.getWorkflowContext(clusterData.getJobConfig(jobName).getWorkflow())); LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s", System.currentTimeMillis() - startTime, jobName)); diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java index 993084a..e63e3b4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java +++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java @@ -20,7 +20,9 @@ package org.apache.helix.task; */ import java.util.ArrayDeque; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; @@ -42,14 +44,18 @@ import org.slf4j.LoggerFactory; */ public class RuntimeJobDag extends JobDag { private static final Logger LOG = LoggerFactory.getLogger(RuntimeJobDag.class); + private static final int DEFAULT_NUM_PARALLEL_JOBS = 1; // For job iterator functionality - private Queue<String> _readyJobList; // Jobs ready to be scheduled + private ArrayDeque<String> _readyJobList; // Jobs ready to be scheduled private Set<String> _inflightJobList; // Jobs that are scheduled but not yet finished private boolean _hasDagChanged; // Flag for DAG modification for job queues; if true, ready-list // must be re-computed private Map<String, Set<String>> _successorMap; // Two dependency maps for populating ready-list private Map<String, Set<String>> _predecessorMap; // when jobs are finished + private boolean _isJobQueue; + private int _numParallelJobs; + private String _lastJob; /** * Constructor for Job DAG. @@ -61,6 +67,15 @@ public class RuntimeJobDag extends JobDag { _hasDagChanged = true; } + public RuntimeJobDag(JobDag jobDag, boolean isJobQueue, int numParallelJobs) { + this._childrenToParents = jobDag.getChildrenToParents(); + this._parentsToChildren = jobDag.getParentsToChildren(); + this._allNodes = jobDag.getAllNodes(); + this._isJobQueue = isJobQueue; + this._numParallelJobs = numParallelJobs <= 0 ? DEFAULT_NUM_PARALLEL_JOBS : numParallelJobs; + generateJobList(); + } + @Override public void addParentToChild(String parent, String child) { _hasDagChanged = true; @@ -103,6 +118,7 @@ public class RuntimeJobDag extends JobDag { * re-generates ready-list. * @return job name. Null if the readyJobList is empty. */ + @Override public String getNextJob() { if (_hasDagChanged) { generateJobList(); // Regenerate the ready list @@ -113,6 +129,7 @@ public class RuntimeJobDag extends JobDag { } String nextJob = _readyJobList.poll(); _inflightJobList.add(nextJob); + _lastJob = nextJob; return nextJob; } @@ -131,9 +148,13 @@ public class RuntimeJobDag extends JobDag { String.format("Job: %s has either finished already, never been scheduled, or been removed from DAG", job)); } // Add finished job's successors to ready-list - if (_successorMap.containsKey(job)) { - for (String successor : _successorMap.get(job)) { - // Remove finished job from predecessor map + if (_isJobQueue) { + if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) { + _readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next()); + } + } else if (_successorMap.containsKey(job)) { + for (String successor : _successorMap.get(job)) { + // Remove finished job from predecessor map if (_predecessorMap.containsKey(successor)) { Set<String> predecessors = _predecessorMap.get(successor); predecessors.remove(job); @@ -160,6 +181,14 @@ public class RuntimeJobDag extends JobDag { resetJobListAndDependencyMaps(); computeIndependentNodes(); _readyJobList.addAll(_independentNodes); + if (_isJobQueue && _readyJobList.size() > 0) { + // For job queue, only get number of parallel jobs to run in the ready list. + for (int i = 1; i < _numParallelJobs; i++) { + if (_parentsToChildren.containsKey(_readyJobList.peekLast())) { + _readyJobList.offer(_parentsToChildren.get(_readyJobList.peekLast()).iterator().next()); + } + } + } _hasDagChanged = false; } @@ -179,4 +208,9 @@ public class RuntimeJobDag extends JobDag { _predecessorMap.put(entry.getKey(), new HashSet<>(entry.getValue())); } } -} \ No newline at end of file + + public Set<String> getInflightJobList() { + return new HashSet<>(_inflightJobList); + } + +} diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index f5b76b1..319aea7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -50,7 +50,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Split it into status update and assign. But there are couple of data need // to pass around. - public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { + public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { // Fetch workflow configuration and context if (workflowCfg == null) { @@ -106,7 +106,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed // This is to handle TIMED_OUT only if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx, - workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache.getTaskDataCache())) { + workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) { workflowCtx.setFinishTime(currentTime); updateWorkflowMonitor(workflowCtx, workflowCfg); _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); @@ -138,6 +138,18 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } } + // Update jobs already inflight + RuntimeJobDag runtimeJobDag = _clusterDataCache.getRuntimeJobDag(workflow); + if (runtimeJobDag != null) { + for (String inflightJob : runtimeJobDag.getInflightJobList()) { + processJob(inflightJob, currentStateOutput, bestPossibleOutput, workflowCtx); + } + } else { + LOG.warn(String.format( + "Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it", + workflow)); + } + _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); } @@ -164,7 +176,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { if (isReady) { // Schedule jobs from this workflow. scheduleJobs(workflow, workflowCfg, workflowCtx, _clusterDataCache.getJobConfigMap(), - _clusterDataCache, currentStateOutput, bestPossibleOutput, resourceMap); + _clusterDataCache, currentStateOutput, bestPossibleOutput); } else { LOG.debug("Workflow " + workflow + " is not ready to be scheduled."); } @@ -190,7 +202,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache, CurrentStateOutput currentStateOutput, - BestPossibleStateOutput bestPossibleOutput, Map<String, Resource> resourceMap) { + BestPossibleStateOutput bestPossibleOutput) { ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); if (scheduleConfig != null && scheduleConfig.isRecurring()) { LOG.debug("Jobs from recurring workflow are not schedule-able"); @@ -200,7 +212,16 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx); int scheduledJobs = 0; long timeToSchedule = Long.MAX_VALUE; - for (String job : workflowCfg.getJobDag().getAllNodes()) { + JobDag jobDag = clusterDataCache.getRuntimeJobDag(workflow); + if (jobDag == null) { + jobDag = workflowCfg.getJobDag(); + } + + String nextJob = jobDag.getNextJob(); + // Assign new jobs + while (nextJob != null) { + String job = nextJob; + nextJob = jobDag.getNextJob(); TaskState jobState = workflowCtx.getJobState(job); if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) { if (LOG.isDebugEnabled()) { @@ -220,7 +241,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // check ancestor job status if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap, - clusterDataCache.getTaskDataCache())) { + clusterDataCache)) { JobConfig jobConfig = jobConfigMap.get(job); if (jobConfig == null) { LOG.error(String.format("The job config is missing for job %s", job)); @@ -253,6 +274,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } } } + long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE : _rebalanceScheduler.getRebalanceTime(workflow); @@ -266,7 +288,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { _clusterDataCache.dispatchJob(job); try { ResourceAssignment resourceAssignment = - _jobDispatcher.processJobStatusUpdateandAssignment(job, currentStateOutput, workflowCtx); + _jobDispatcher.processJobStatusUpdateAndAssignment(job, currentStateOutput, workflowCtx); updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput); } catch (Exception e) { LogUtil.logWarn(LOG, _clusterDataCache.getEventId(), diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index c046bb0..fa67f9d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -49,7 +49,8 @@ public class WorkflowRebalancer extends TaskRebalancer { _workflowDispatcher.setClusterStatusMonitor(_clusterStatusMonitor); _workflowDispatcher.updateCache(clusterData); - _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, workflowCtx); + _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, workflowCtx, currStateOutput, + new BestPossibleStateOutput()); _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, currStateOutput, new BestPossibleStateOutput(), new HashMap<String, Resource>()); diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java index cd8f882..7cb3e75 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java @@ -165,7 +165,7 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB, true); boolean result = ClusterStateVerifier - .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); verify(TEST_DB); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java index df4e5ba..23d3bda 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java @@ -118,7 +118,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase { _participants[1].syncStop(); - // verifyBalanziceExternalView(); + // verifyBalanceExternalView(); result = ClusterStateVerifier.verifyByPolling(new ExternalViewBalancedVerifier(_gZkClient, CLUSTER_NAME, TEST_DB)); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 1887689..f390063 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -21,6 +21,8 @@ package org.apache.helix.integration.task; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -131,42 +133,46 @@ public class TaskTestUtil { maxRunningCount = runningCount; } - List<JobContext> jobContextList = new ArrayList<JobContext>(); - for (String jobName : workflowConfig.getJobDag().getAllNodes()) { - JobContext jobContext = driver.getJobContext(jobName); - if (jobContext != null) { - jobContextList.add(driver.getJobContext(jobName)); - } - } + Thread.sleep(100); + } - if (!workflowConfig.isAllowOverlapJobAssignment()) { - Set<String> instances = new HashSet<String>(); - for (JobContext jobContext : jobContextList) { - for (int partition : jobContext.getPartitionSet()) { - String instance = jobContext.getAssignedParticipant(partition); - TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition); - - if (instance == null) { - continue; - } - if (taskPartitionState != TaskPartitionState.INIT && taskPartitionState != TaskPartitionState.RUNNING) { - continue; - } - if (instances.contains(instance)) { - return false; - } - - TaskPartitionState state = jobContext.getPartitionState(partition); - if (state != TaskPartitionState.COMPLETED) { - instances.add(instance); - } + List<JobContext> jobContextList = new ArrayList<>(); + for (String jobName : workflowConfig.getJobDag().getAllNodes()) { + JobContext jobContext = driver.getJobContext(jobName); + if (jobContext != null) { + jobContextList.add(driver.getJobContext(jobName)); + } + } + Map<String, List<long[]>> rangeMap = new HashMap<>(); + + if (!workflowConfig.isAllowOverlapJobAssignment()) { + for (JobContext jobContext : jobContextList) { + for (int partition : jobContext.getPartitionSet()) { + String instance = jobContext.getAssignedParticipant(partition); + if (!rangeMap.containsKey(instance)) { + rangeMap.put(instance, new ArrayList<long[]>()); } + rangeMap.get(instance).add(new long[] { jobContext.getPartitionStartTime(partition), + jobContext.getPartitionFinishTime(partition) + }); } } - - Thread.sleep(100); } + for (List<long[]> timeRange : rangeMap.values()) { + Collections.sort(timeRange, new Comparator<long[]>() { + @Override + public int compare(long[] o1, long[] o2) { + return (int) (o1[0] - o2[0]); + } + }); + + for (int i = 0; i < timeRange.size() - 1; i++) { + if (timeRange.get(i)[1] > timeRange.get(i + 1)[0]) { + return false; + } + } + } return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? maxRunningCount <= workflowConfig .getParallelJobs() : true); } @@ -296,16 +302,18 @@ public class TaskTestUtil { stage.postProcess(); } - public static BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache, + public static BestPossibleStateOutput calculateTaskSchedulingStage(ClusterDataCache cache, HelixManager manager) throws Exception { ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown); event.addAttribute(AttributeName.ClusterDataCache.name(), cache); event.addAttribute(AttributeName.helixmanager.name(), manager); event.addAttribute(AttributeName.PipelineType.name(), "TASK"); - Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool = new HashMap<>(); + Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool = + new HashMap<>(); DedupEventProcessor<String, Runnable> worker = - new DedupEventProcessor<String, Runnable>("ClusterName", AsyncWorkerType.TaskJobPurgeWorker.name()) { + new DedupEventProcessor<String, Runnable>("ClusterName", + AsyncWorkerType.TaskJobPurgeWorker.name()) { @Override protected void handleEvent(Runnable event) { // TODO: retry when queue is empty and event.run() failed? diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java index cd14c68..59cfa76 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java @@ -62,11 +62,11 @@ public class TestScheduleDelayTask extends TaskTestBase { long jobFinishTime = 0L; for (int i = 1; i <= 3; i++) { jobFinishTime = Math.max(jobFinishTime, - _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job1")) + _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job" + i)) .getFinishTime()); } - long jobTwoStartTime = - _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job4")).getStartTime(); + long jobTwoStartTime = _driver.getWorkflowContext(workflowName) + .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job4")); Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L); } @@ -90,8 +90,8 @@ public class TestScheduleDelayTask extends TaskTestBase { _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job2"), TaskState.COMPLETED); - long jobTwoStartTime = - _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job2")).getStartTime(); + long jobTwoStartTime = _driver.getWorkflowContext(workflowName) + .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job2")); Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L); } @@ -118,8 +118,8 @@ public class TestScheduleDelayTask extends TaskTestBase { long jobFinishTime = _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job3")).getFinishTime(); - long jobTwoStartTime = - _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job4")).getStartTime(); + long jobTwoStartTime = _driver.getWorkflowContext(workflowName) + .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job4")); Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L); } @@ -145,8 +145,8 @@ public class TestScheduleDelayTask extends TaskTestBase { _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job2"), TaskState.COMPLETED); - long jobTwoStartTime = - _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job2")).getStartTime(); + long jobTwoStartTime = _driver.getWorkflowContext(workflowName) + .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job2")); Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java index 9910798..f81469c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java @@ -39,6 +39,7 @@ public class TestTaskRebalancerParallel extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { _numDbs = 4; + _partitionVary = false; super.beforeClass(); } @@ -47,7 +48,8 @@ public class TestTaskRebalancerParallel extends TaskTestBase { * (1) the number of running job does not exceed configured max allowed parallel jobs * (2) one instance can only be assigned to one job in the workflow */ - @Test public void testWhenDisallowOverlapJobAssignment() throws Exception { + @Test + public void testWhenDisallowOverlapJobAssignment() throws Exception { String queueName = TestHelper.getTestMethodName(); WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(queueName); @@ -63,7 +65,8 @@ public class TestTaskRebalancerParallel extends TaskTestBase { for (String testDbName : _testDbs) { jobConfigBuilders.add( new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName) - .setTargetPartitionStates(Collections.singleton("SLAVE"))); + .setTargetPartitionStates(Collections.singleton("SLAVE")) + .setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "1000"))); } _driver.stop(queueName); @@ -71,7 +74,7 @@ public class TestTaskRebalancerParallel extends TaskTestBase { _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i)); } _driver.resume(queueName); - Thread.sleep(2000); + Thread.sleep(1000L); Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_driver, queueName)); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java index 1a08468..0ce908f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java @@ -209,7 +209,7 @@ public class TestWorkflowTermination extends TaskTestBase { String job3 = JOB_NAME + "3"; String job4 = JOB_NAME + "4"; long workflowExpiry = 10000; - long timeout = 8000; + long timeout = 10000; JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 1); JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 1); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java index d5dea6c..79c5067 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java @@ -82,13 +82,13 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase { _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), CLUSTER_NAME); _cache.setTaskCache(true); TaskUtil.setWorkflowContext(_manager, queue, workflowContext); - TaskTestUtil.calculateBestPossibleState(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); Thread.sleep(500); WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue); Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft); _cache.requireFullRefresh(); _cache.refresh(_manager.getHelixDataAccessor()); - TaskTestUtil.calculateBestPossibleState(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); Thread.sleep(500); workflowContext = _driver.getWorkflowContext(queue); Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime @@ -117,7 +117,7 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase { _driver.start(builder.build()); _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), CLUSTER_NAME); TaskUtil.setWorkflowContext(_manager, queue, workflowContext); - TaskTestUtil.calculateBestPossibleState(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue); Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2); } diff --git a/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java index 0ad12ff..b3dfd8e 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java @@ -56,7 +56,8 @@ public class TestScheduleDelayJobs extends TaskSynchronizedTestBase { _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), CLUSTER_NAME); long currentTime = System.currentTimeMillis(); TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext); - TaskTestUtil.calculateBestPossibleState(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) - currentTime >= 10000L); } @@ -80,7 +81,8 @@ public class TestScheduleDelayJobs extends TaskSynchronizedTestBase { _driver.start(builder.build()); _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), CLUSTER_NAME); TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext); - TaskTestUtil.calculateBestPossibleState(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); + TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager); Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) == currentTime); }
