This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit dad3f36a1b8aeb7590214069141915a8bf932ec6
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Nov 5 16:55:22 2018 -0800

    Integrate JobIterator for existing pipeline
    
    Existing pipeline still looping all the jobs inside the workflow. When 
JobIterator is ready, we shall change the looping mechanism to apply the 
JobIterator and skip the jobs when quota is used out.
---
 .../apache/helix/common/caches/TaskDataCache.java  | 50 ++++++++++++++-
 .../helix/controller/stages/ClusterDataCache.java  |  6 ++
 .../stages/task/TaskSchedulingStage.java           |  5 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  | 26 ++++++--
 .../main/java/org/apache/helix/task/JobDag.java    | 20 +++++-
 .../java/org/apache/helix/task/JobDispatcher.java  |  7 +-
 .../java/org/apache/helix/task/JobRebalancer.java  | 22 +------
 .../java/org/apache/helix/task/RuntimeJobDag.java  | 44 +++++++++++--
 .../org/apache/helix/task/WorkflowDispatcher.java  | 36 +++++++++--
 .../org/apache/helix/task/WorkflowRebalancer.java  |  3 +-
 .../TestStateTransitionTimeoutWithResource.java    |  2 +-
 .../TestAutoRebalancePartitionLimit.java           |  2 +-
 .../helix/integration/task/TaskTestUtil.java       | 74 ++++++++++++----------
 .../integration/task/TestScheduleDelayTask.java    | 18 +++---
 .../task/TestTaskRebalancerParallel.java           |  9 ++-
 .../integration/task/TestWorkflowTermination.java  |  2 +-
 .../apache/helix/task/TestCleanExpiredJobs.java    |  6 +-
 .../apache/helix/task/TestScheduleDelayJobs.java   |  6 +-
 18 files changed, 236 insertions(+), 102 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 95ee62d..afe552e 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -36,6 +36,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.RuntimeJobDag;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -51,6 +52,7 @@ public class TaskDataCache extends AbstractDataCache {
 
   private String _clusterName;
   private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
+  private Map<String, RuntimeJobDag> _runtimeJobDagMap = new HashMap<>();
   private Map<String, WorkflowConfig> _workflowConfigMap = new 
ConcurrentHashMap<>();
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
   private Set<String> _contextToUpdate = new HashSet<>();
@@ -83,18 +85,53 @@ public class TaskDataCache extends AbstractDataCache {
     refreshJobContexts(accessor);
     // update workflow and job configs.
     _workflowConfigMap.clear();
-    _jobConfigMap.clear();
+    Map<String, JobConfig> newJobConfigs = new HashMap<>();
+    Set<String> workflowsUpdated = new HashSet<>();
     for (Map.Entry<String, ResourceConfig> entry : 
resourceConfigMap.entrySet()) {
       if (entry.getValue().getRecord().getSimpleFields()
           .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
         _workflowConfigMap.put(entry.getKey(), new 
WorkflowConfig(entry.getValue()));
+        if (!_runtimeJobDagMap.containsKey(entry.getKey())) {
+          WorkflowConfig workflowConfig = 
_workflowConfigMap.get(entry.getKey());
+          _runtimeJobDagMap.put(entry.getKey(), new 
RuntimeJobDag(workflowConfig.getJobDag(),
+              workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
+              workflowConfig.getParallelJobs()));
+        }
       } else if (entry.getValue().getRecord().getSimpleFields()
           
.containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
-        _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
+        newJobConfigs.put(entry.getKey(), new JobConfig(entry.getValue()));
+      }
+    }
+
+    // The following 3 blocks is for finding a list of workflows whose JobDAGs 
have been changed
+    // because their RuntimeJobDags would need to be re-built
+    // newly added jobs
+    for (String jobName : newJobConfigs.keySet()) {
+      if (!_jobConfigMap.containsKey(jobName) && 
newJobConfigs.get(jobName).getWorkflow() != null) {
+        workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow());
       }
     }
-    _dispatchedJobs.clear();
 
+    // Removed jobs
+    for (String jobName : _jobConfigMap.keySet()) {
+      if (!newJobConfigs.containsKey(jobName) && 
_jobConfigMap.get(jobName).getWorkflow() != null) {
+        workflowsUpdated.add(_jobConfigMap.get(jobName).getWorkflow());
+      }
+    }
+
+    // Combine all the workflows' job dag which need update
+    for (String changedWorkflow : workflowsUpdated) {
+      if (_workflowConfigMap.containsKey(changedWorkflow)) {
+        WorkflowConfig workflowConfig = 
_workflowConfigMap.get(changedWorkflow);
+        _runtimeJobDagMap.put(changedWorkflow, new 
RuntimeJobDag(workflowConfig.getJobDag(),
+            workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
+            workflowConfig.getParallelJobs()));
+      }
+    }
+
+    _dispatchedJobs.clear();
+    _runtimeJobDagMap.keySet().retainAll(_workflowConfigMap.keySet());
+    _jobConfigMap = newJobConfigs;
     return true;
   }
 
@@ -319,4 +356,11 @@ public class TaskDataCache extends AbstractDataCache {
   public Set<String> getDispatchedJobs() {
     return _dispatchedJobs;
   }
+
+  public RuntimeJobDag getRuntimeJobDag(String workflowName) {
+    if (_runtimeJobDagMap.containsKey(workflowName)) {
+      return _runtimeJobDagMap.get(workflowName);
+    }
+    return null;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 571267e..c240e23 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -58,6 +58,8 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.RuntimeJobDag;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.WorkflowConfig;
@@ -1035,4 +1037,8 @@ public class ClusterDataCache extends AbstractDataCache {
   public Set<String> getDispatchedJobs() {
     return _taskDataCache.getDispatchedJobs();
   }
+
+  public RuntimeJobDag getRuntimeJobDag(String workflowName) {
+    return _taskDataCache.getRuntimeJobDag(workflowName);
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 218f470..5614e98 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -65,12 +65,12 @@ public class TaskSchedulingStage extends AbstractBaseStage {
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
-
   }
 
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,
       CurrentStateOutput currentStateOutput) {
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
+    // After compute all workflows and jobs, there are still task resources 
need to be DROPPED
     Map<String, Resource> restOfResources = new HashMap<>(resourceMap);
     BestPossibleStateOutput output = new BestPossibleStateOutput();
     final List<String> failureResources = new ArrayList<>();
@@ -253,7 +253,8 @@ public class TaskSchedulingStage extends AbstractBaseStage {
             WorkflowContext context = _workflowDispatcher
                 .getOrInitializeWorkflowContext(workflowId, 
cache.getTaskDataCache());
             _workflowDispatcher
-                .updateWorkflowStatus(workflowId, 
cache.getWorkflowConfig(workflowId), context);
+                .updateWorkflowStatus(workflowId, 
cache.getWorkflowConfig(workflowId), context,
+                    currentStateOutput, bestPossibleOutput);
             _workflowDispatcher
                 .assignWorkflow(workflowId, 
cache.getWorkflowConfig(workflowId), context,
                     currentStateOutput, bestPossibleOutput, resourceMap);
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index f340d71..b2f1120 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -424,7 +424,7 @@ public abstract class AbstractTaskDispatcher {
       WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap,
       ClusterDataCache clusterDataCache) {
     markJobFailed(jobName, jobContext, workflowConfig, workflowContext, 
jobConfigMap,
-        clusterDataCache.getTaskDataCache());
+        clusterDataCache);
     // Mark all INIT task to TASK_ABORTED
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
@@ -761,10 +761,11 @@ public abstract class AbstractTaskDispatcher {
   protected void markJobComplete(String jobName, JobContext jobContext,
       WorkflowConfig workflowConfig, WorkflowContext workflowContext,
       Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
+    finishJobInRuntimeJobDag(clusterDataCache, workflowConfig.getWorkflowId(), 
jobName);
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.COMPLETED);
     jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, 
clusterDataCache.getTaskDataCache())) {
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, 
clusterDataCache)) {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
@@ -773,7 +774,8 @@ public abstract class AbstractTaskDispatcher {
 
   protected void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
       WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap,
-      TaskDataCache clusterDataCache) {
+      ClusterDataCache clusterDataCache) {
+    finishJobInRuntimeJobDag(clusterDataCache, workflowConfig.getWorkflowId(), 
jobName);
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.FAILED);
     if (jobContext != null) {
@@ -811,7 +813,7 @@ public abstract class AbstractTaskDispatcher {
    *         returns false otherwise.
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
-      Map<String, JobConfig> jobConfigMap, TaskDataCache clusterDataCache) {
+      Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
     boolean incomplete = false;
 
     TaskState workflowState = ctx.getWorkflowState();
@@ -1019,7 +1021,7 @@ public abstract class AbstractTaskDispatcher {
    */
   protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
       WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap,
-      TaskDataCache clusterDataCache) {
+      ClusterDataCache clusterDataCache) {
     int notStartedCount = 0;
     int failedOrTimeoutCount = 0;
     int incompleteParentCount = 0;
@@ -1104,4 +1106,18 @@ public abstract class AbstractTaskDispatcher {
       output.setState(resource, partition, newStateMap);
     }
   }
+
+  protected void finishJobInRuntimeJobDag(ClusterDataCache clusterDataCache, 
String workflowName,
+      String jobName) {
+    RuntimeJobDag runtimeJobDag = 
clusterDataCache.getRuntimeJobDag(workflowName);
+    if (runtimeJobDag != null) {
+      runtimeJobDag.finishJob(jobName);
+      LOG.debug(
+          String.format("Finish job %s of workflow %s for runtime job DAG", 
jobName, workflowName));
+    } else {
+      LOG.warn(String
+          .format("Failed to find runtime job DAG for workflow %s and job %s", 
workflowName,
+              jobName));
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 1816b8e..9a7c9e3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -21,12 +21,15 @@ package org.apache.helix.task;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 
 import java.util.TreeMap;
 import java.util.TreeSet;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.annotate.JsonProperty;
@@ -46,12 +49,14 @@ public class JobDag {
   protected Map<String, Set<String>> _childrenToParents;
 
   @JsonProperty("allNodes")
-  private Set<String> _allNodes;
+  protected Set<String> _allNodes;
   protected Set<String> _independentNodes; // Un-parented nodes are stored to 
avoid repeated calculation
   // unless there is a DAG modification
 
   public static final JobDag EMPTY_DAG = new JobDag();
 
+  protected Iterator<String> _jobIterator;
+
   /**
    * Constructor for Job DAG.
    */
@@ -263,4 +268,15 @@ public class JobDag {
       }
     }
   }
-}
\ No newline at end of file
+
+  @JsonIgnore
+  public String getNextJob() {
+    if (_jobIterator == null) {
+      _jobIterator = _allNodes.iterator();
+    }
+    if (_jobIterator.hasNext()) {
+      return _jobIterator.next();
+    }
+    return null;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 20ce23f..9d602a4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -35,7 +35,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     _clusterDataCache = cache;
   }
 
-  public ResourceAssignment processJobStatusUpdateandAssignment(String jobName,
+  public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
       CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
     // Fetch job configuration
     JobConfig jobCfg = _clusterDataCache.getJobConfig(jobName);
@@ -52,7 +52,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
-
     if (workflowCtx == null) {
       LOG.error("Workflow context is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -75,6 +74,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       LOG.info(String.format(
           "Workflow %s or job %s is already failed or completed, workflow 
state (%s), job state (%s), clean up job IS.",
           workflowResource, jobName, workflowState, jobState));
+      finishJobInRuntimeJobDag(_clusterDataCache, workflowResource, jobName);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobName);
       _rebalanceScheduler.removeScheduledRebalance(jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -87,7 +87,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     if (!TaskUtil.isJobStarted(jobName, workflowCtx) && 
!isJobReadyToSchedule(jobName, workflowCfg,
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        _clusterDataCache.getJobConfigMap(), 
_clusterDataCache.getTaskDataCache())) {
+        _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
@@ -267,6 +267,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // can be dropped(note that Helix doesn't track whether the drop is 
success or not).
     if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, 
currStateOutput)) {
       handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
+      finishJobInRuntimeJobDag(cache, workflowConfig.getWorkflowId(), 
jobResource);
       return buildEmptyAssignment(jobResource, currStateOutput);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 1bb3ff4..334b605 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -19,34 +19,14 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
 /**
  * Custom rebalancer implementation for the {@code Job} in task model.
  */
@@ -68,7 +48,7 @@ public class JobRebalancer extends TaskRebalancer {
     _jobDispatcher.updateCache(clusterData);
     _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
     ResourceAssignment resourceAssignment = _jobDispatcher
-        .processJobStatusUpdateandAssignment(jobName, currStateOutput,
+        .processJobStatusUpdateAndAssignment(jobName, currStateOutput,
             
clusterData.getWorkflowContext(clusterData.getJobConfig(jobName).getWorkflow()));
     LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s",
         System.currentTimeMillis() - startTime, jobName));
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java 
b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index 993084a..e63e3b4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -20,7 +20,9 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
@@ -42,14 +44,18 @@ import org.slf4j.LoggerFactory;
  */
 public class RuntimeJobDag extends JobDag {
   private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeJobDag.class);
+  private static final int DEFAULT_NUM_PARALLEL_JOBS = 1;
 
   // For job iterator functionality
-  private Queue<String> _readyJobList; // Jobs ready to be scheduled
+  private ArrayDeque<String> _readyJobList; // Jobs ready to be scheduled
   private Set<String> _inflightJobList; // Jobs that are scheduled but not yet 
finished
   private boolean _hasDagChanged; // Flag for DAG modification for job queues; 
if true, ready-list
                                   // must be re-computed
   private Map<String, Set<String>> _successorMap; // Two dependency maps for 
populating ready-list
   private Map<String, Set<String>> _predecessorMap; // when jobs are finished
+  private boolean _isJobQueue;
+  private int _numParallelJobs;
+  private String _lastJob;
 
   /**
    * Constructor for Job DAG.
@@ -61,6 +67,15 @@ public class RuntimeJobDag extends JobDag {
     _hasDagChanged = true;
   }
 
+  public RuntimeJobDag(JobDag jobDag, boolean isJobQueue, int numParallelJobs) 
{
+    this._childrenToParents = jobDag.getChildrenToParents();
+    this._parentsToChildren = jobDag.getParentsToChildren();
+    this._allNodes = jobDag.getAllNodes();
+    this._isJobQueue = isJobQueue;
+    this._numParallelJobs = numParallelJobs <= 0 ? DEFAULT_NUM_PARALLEL_JOBS : 
numParallelJobs;
+    generateJobList();
+  }
+
   @Override
   public void addParentToChild(String parent, String child) {
     _hasDagChanged = true;
@@ -103,6 +118,7 @@ public class RuntimeJobDag extends JobDag {
    * re-generates ready-list.
    * @return job name. Null if the readyJobList is empty.
    */
+  @Override
   public String getNextJob() {
     if (_hasDagChanged) {
       generateJobList(); // Regenerate the ready list
@@ -113,6 +129,7 @@ public class RuntimeJobDag extends JobDag {
     }
     String nextJob = _readyJobList.poll();
     _inflightJobList.add(nextJob);
+    _lastJob = nextJob;
     return nextJob;
   }
 
@@ -131,9 +148,13 @@ public class RuntimeJobDag extends JobDag {
           String.format("Job: %s has either finished already, never been 
scheduled, or been removed from DAG", job));
     }
     // Add finished job's successors to ready-list
-    if (_successorMap.containsKey(job)) {
-      for (String successor : _successorMap.get(job)) {
-        // Remove finished job from predecessor map
+    if (_isJobQueue) {
+      if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+        
_readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+      }
+    } else if (_successorMap.containsKey(job)) {
+        for (String successor : _successorMap.get(job)) {
+          // Remove finished job from predecessor map
         if (_predecessorMap.containsKey(successor)) {
           Set<String> predecessors = _predecessorMap.get(successor);
           predecessors.remove(job);
@@ -160,6 +181,14 @@ public class RuntimeJobDag extends JobDag {
     resetJobListAndDependencyMaps();
     computeIndependentNodes();
     _readyJobList.addAll(_independentNodes);
+    if (_isJobQueue && _readyJobList.size() > 0) {
+      // For job queue, only get number of parallel jobs to run in the ready 
list.
+      for (int i = 1; i < _numParallelJobs; i++) {
+        if (_parentsToChildren.containsKey(_readyJobList.peekLast())) {
+          
_readyJobList.offer(_parentsToChildren.get(_readyJobList.peekLast()).iterator().next());
+        }
+      }
+    }
     _hasDagChanged = false;
   }
 
@@ -179,4 +208,9 @@ public class RuntimeJobDag extends JobDag {
       _predecessorMap.put(entry.getKey(), new HashSet<>(entry.getValue()));
     }
   }
-}
\ No newline at end of file
+
+  public Set<String> getInflightJobList() {
+    return new HashSet<>(_inflightJobList);
+  }
+
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index f5b76b1..319aea7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -50,7 +50,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
 
   // Split it into status update and assign. But there are couple of data need
   // to pass around.
-  public void updateWorkflowStatus(String workflow, WorkflowConfig 
workflowCfg, WorkflowContext workflowCtx) {
+  public void updateWorkflowStatus(String workflow, WorkflowConfig 
workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput 
currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
 
     // Fetch workflow configuration and context
     if (workflowCfg == null) {
@@ -106,7 +106,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
     // Note that COMPLETE and FAILED will be marked in markJobComplete / 
markJobFailed
     // This is to handle TIMED_OUT only
     if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && 
isWorkflowFinished(workflowCtx,
-        workflowCfg, _clusterDataCache.getJobConfigMap(), 
_clusterDataCache.getTaskDataCache())) {
+        workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
       workflowCtx.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowCtx, workflowCfg);
       _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
@@ -138,6 +138,18 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
       }
     }
 
+    // Update jobs already inflight
+    RuntimeJobDag runtimeJobDag = _clusterDataCache.getRuntimeJobDag(workflow);
+    if (runtimeJobDag != null) {
+      for (String inflightJob : runtimeJobDag.getInflightJobList()) {
+        processJob(inflightJob, currentStateOutput, bestPossibleOutput, 
workflowCtx);
+      }
+    } else {
+      LOG.warn(String.format(
+          "Failed to find runtime job DAG for workflow %s, existing runtime 
jobs may not be processed correctly for it",
+          workflow));
+    }
+
     _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
   }
 
@@ -164,7 +176,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
     if (isReady) {
       // Schedule jobs from this workflow.
       scheduleJobs(workflow, workflowCfg, workflowCtx, 
_clusterDataCache.getJobConfigMap(),
-          _clusterDataCache, currentStateOutput, bestPossibleOutput, 
resourceMap);
+          _clusterDataCache, currentStateOutput, bestPossibleOutput);
     } else {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
@@ -190,7 +202,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
   private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
       WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
       ClusterDataCache clusterDataCache, CurrentStateOutput currentStateOutput,
-      BestPossibleStateOutput bestPossibleOutput, Map<String, Resource> 
resourceMap) {
+      BestPossibleStateOutput bestPossibleOutput) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
       LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -200,7 +212,16 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
     int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, 
workflowCtx);
     int scheduledJobs = 0;
     long timeToSchedule = Long.MAX_VALUE;
-    for (String job : workflowCfg.getJobDag().getAllNodes()) {
+    JobDag jobDag = clusterDataCache.getRuntimeJobDag(workflow);
+    if (jobDag == null) {
+      jobDag = workflowCfg.getJobDag();
+    }
+
+    String nextJob = jobDag.getNextJob();
+    // Assign new jobs
+    while (nextJob != null) {
+      String job = nextJob;
+      nextJob = jobDag.getNextJob();
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
         if (LOG.isDebugEnabled()) {
@@ -220,7 +241,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
 
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap,
-          clusterDataCache.getTaskDataCache())) {
+          clusterDataCache)) {
         JobConfig jobConfig = jobConfigMap.get(job);
         if (jobConfig == null) {
           LOG.error(String.format("The job config is missing for job %s", 
job));
@@ -253,6 +274,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
         }
       }
     }
+
     long currentScheduledTime =
         _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
             : _rebalanceScheduler.getRebalanceTime(workflow);
@@ -266,7 +288,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
     _clusterDataCache.dispatchJob(job);
     try {
       ResourceAssignment resourceAssignment =
-          _jobDispatcher.processJobStatusUpdateandAssignment(job, 
currentStateOutput, workflowCtx);
+          _jobDispatcher.processJobStatusUpdateAndAssignment(job, 
currentStateOutput, workflowCtx);
       updateBestPossibleStateOutput(job, resourceAssignment, 
bestPossibleOutput);
     } catch (Exception e) {
       LogUtil.logWarn(LOG, _clusterDataCache.getEventId(),
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index c046bb0..fa67f9d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -49,7 +49,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     _workflowDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
     _workflowDispatcher.updateCache(clusterData);
-    _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, 
workflowCtx);
+    _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, 
workflowCtx, currStateOutput,
+        new BestPossibleStateOutput());
     _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, 
currStateOutput,
         new BestPossibleStateOutput(), new HashMap<String, Resource>());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index cd8f882..7cb3e75 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -165,7 +165,7 @@ public class TestStateTransitionTimeoutWithResource extends 
ZkStandAloneCMTestBa
     _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, 
TEST_DB, true);
     boolean result =
         ClusterStateVerifier
-            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
+            .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
     Assert.assertTrue(result);
 
     verify(TEST_DB);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
index df4e5ba..23d3bda 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
@@ -118,7 +118,7 @@ public class TestAutoRebalancePartitionLimit extends 
ZkStandAloneCMTestBase {
 
     _participants[1].syncStop();
 
-    // verifyBalanziceExternalView();
+    // verifyBalanceExternalView();
     result =
         ClusterStateVerifier.verifyByPolling(new 
ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 1887689..f390063 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -21,6 +21,8 @@ package org.apache.helix.integration.task;
 
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -131,42 +133,46 @@ public class TaskTestUtil {
         maxRunningCount = runningCount;
       }
 
-      List<JobContext> jobContextList = new ArrayList<JobContext>();
-      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        JobContext jobContext = driver.getJobContext(jobName);
-        if (jobContext != null) {
-          jobContextList.add(driver.getJobContext(jobName));
-        }
-      }
+      Thread.sleep(100);
+    }
 
-      if (!workflowConfig.isAllowOverlapJobAssignment()) {
-        Set<String> instances = new HashSet<String>();
-        for (JobContext jobContext : jobContextList) {
-          for (int partition : jobContext.getPartitionSet()) {
-            String instance = jobContext.getAssignedParticipant(partition);
-            TaskPartitionState taskPartitionState = 
jobContext.getPartitionState(partition);
-
-            if (instance == null) {
-              continue;
-            }
-            if (taskPartitionState != TaskPartitionState.INIT && 
taskPartitionState != TaskPartitionState.RUNNING) {
-              continue;
-            }
-            if (instances.contains(instance)) {
-              return false;
-            }
-
-            TaskPartitionState state = jobContext.getPartitionState(partition);
-            if (state != TaskPartitionState.COMPLETED) {
-              instances.add(instance);
-            }
+    List<JobContext> jobContextList = new ArrayList<>();
+    for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+      JobContext jobContext = driver.getJobContext(jobName);
+      if (jobContext != null) {
+        jobContextList.add(driver.getJobContext(jobName));
+      }
+    }
+    Map<String, List<long[]>> rangeMap = new HashMap<>();
+
+    if (!workflowConfig.isAllowOverlapJobAssignment()) {
+      for (JobContext jobContext : jobContextList) {
+        for (int partition : jobContext.getPartitionSet()) {
+          String instance = jobContext.getAssignedParticipant(partition);
+          if (!rangeMap.containsKey(instance)) {
+            rangeMap.put(instance, new ArrayList<long[]>());
           }
+          rangeMap.get(instance).add(new long[] { 
jobContext.getPartitionStartTime(partition),
+              jobContext.getPartitionFinishTime(partition)
+          });
         }
       }
-
-      Thread.sleep(100);
     }
 
+    for (List<long[]> timeRange : rangeMap.values()) {
+      Collections.sort(timeRange, new Comparator<long[]>() {
+        @Override
+        public int compare(long[] o1, long[] o2) {
+          return (int) (o1[0] - o2[0]);
+        }
+      });
+
+      for (int i = 0; i < timeRange.size() - 1; i++) {
+        if (timeRange.get(i)[1] > timeRange.get(i + 1)[0]) {
+          return false;
+        }
+      }
+    }
     return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? 
maxRunningCount <= workflowConfig
         .getParallelJobs() : true);
   }
@@ -296,16 +302,18 @@ public class TaskTestUtil {
     stage.postProcess();
   }
 
-  public static BestPossibleStateOutput 
calculateBestPossibleState(ClusterDataCache cache,
+  public static BestPossibleStateOutput 
calculateTaskSchedulingStage(ClusterDataCache cache,
       HelixManager manager) throws Exception {
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
     event.addAttribute(AttributeName.PipelineType.name(), "TASK");
 
-    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> 
asyncFIFOWorkerPool = new HashMap<>();
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> 
asyncFIFOWorkerPool =
+        new HashMap<>();
     DedupEventProcessor<String, Runnable> worker =
-        new DedupEventProcessor<String, Runnable>("ClusterName", 
AsyncWorkerType.TaskJobPurgeWorker.name()) {
+        new DedupEventProcessor<String, Runnable>("ClusterName",
+            AsyncWorkerType.TaskJobPurgeWorker.name()) {
           @Override
           protected void handleEvent(Runnable event) {
             // TODO: retry when queue is empty and event.run() failed?
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
index cd14c68..59cfa76 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
@@ -62,11 +62,11 @@ public class TestScheduleDelayTask extends TaskTestBase {
     long jobFinishTime = 0L;
     for (int i = 1; i <= 3; i++) {
       jobFinishTime = Math.max(jobFinishTime,
-          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job1"))
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job" + i))
               .getFinishTime());
     }
-    long jobTwoStartTime =
-        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job4")).getStartTime();
+    long jobTwoStartTime = _driver.getWorkflowContext(workflowName)
+        .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job4"));
 
     Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L);
   }
@@ -90,8 +90,8 @@ public class TestScheduleDelayTask extends TaskTestBase {
     _driver.pollForJobState(workflowName, 
TaskUtil.getNamespacedJobName(workflowName, "Job2"),
         TaskState.COMPLETED);
 
-    long jobTwoStartTime =
-        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job2")).getStartTime();
+    long jobTwoStartTime = _driver.getWorkflowContext(workflowName)
+        .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job2"));
 
     Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L);
   }
@@ -118,8 +118,8 @@ public class TestScheduleDelayTask extends TaskTestBase {
     long jobFinishTime =
         _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job3")).getFinishTime();
 
-    long jobTwoStartTime =
-        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job4")).getStartTime();
+    long jobTwoStartTime = _driver.getWorkflowContext(workflowName)
+        .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job4"));
 
     Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L);
   }
@@ -145,8 +145,8 @@ public class TestScheduleDelayTask extends TaskTestBase {
     _driver.pollForJobState(workflowName, 
TaskUtil.getNamespacedJobName(workflowName, "Job2"),
         TaskState.COMPLETED);
 
-    long jobTwoStartTime =
-        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, 
"Job2")).getStartTime();
+    long jobTwoStartTime = _driver.getWorkflowContext(workflowName)
+        .getJobStartTime(TaskUtil.getNamespacedJobName(workflowName, "Job2"));
 
     Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L);
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 9910798..f81469c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -39,6 +39,7 @@ public class TestTaskRebalancerParallel extends TaskTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     _numDbs = 4;
+    _partitionVary = false;
     super.beforeClass();
   }
 
@@ -47,7 +48,8 @@ public class TestTaskRebalancerParallel extends TaskTestBase {
    * (1) the number of running job does not exceed configured max allowed 
parallel jobs
    * (2) one instance can only be assigned to one job in the workflow
    */
-  @Test public void testWhenDisallowOverlapJobAssignment() throws Exception {
+  @Test
+  public void testWhenDisallowOverlapJobAssignment() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(queueName);
@@ -63,7 +65,8 @@ public class TestTaskRebalancerParallel extends TaskTestBase {
     for (String testDbName : _testDbs) {
       jobConfigBuilders.add(
           new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
-              .setTargetPartitionStates(Collections.singleton("SLAVE")));
+              .setTargetPartitionStates(Collections.singleton("SLAVE"))
+              
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "1000")));
     }
 
     _driver.stop(queueName);
@@ -71,7 +74,7 @@ public class TestTaskRebalancerParallel extends TaskTestBase {
       _driver.enqueueJob(queueName, "job_" + (i + 1), 
jobConfigBuilders.get(i));
     }
     _driver.resume(queueName);
-    Thread.sleep(2000);
+    Thread.sleep(1000L);
     Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_driver, 
queueName));
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 1a08468..0ce908f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -209,7 +209,7 @@ public class TestWorkflowTermination extends TaskTestBase {
     String job3 = JOB_NAME + "3";
     String job4 = JOB_NAME + "4";
     long workflowExpiry = 10000;
-    long timeout = 8000;
+    long timeout = 10000;
 
     JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 
1);
     JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, 
true, 1);
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java 
b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index d5dea6c..79c5067 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -82,13 +82,13 @@ public class TestCleanExpiredJobs extends 
TaskSynchronizedTestBase {
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), 
CLUSTER_NAME);
     _cache.setTaskCache(true);
     TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
-    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
     Thread.sleep(500);
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
     Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
     _cache.requireFullRefresh();
     _cache.refresh(_manager.getHelixDataAccessor());
-    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
     Thread.sleep(500);
     workflowContext = _driver.getWorkflowContext(queue);
     Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
@@ -117,7 +117,7 @@ public class TestCleanExpiredJobs extends 
TaskSynchronizedTestBase {
     _driver.start(builder.build());
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), 
CLUSTER_NAME);
     TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
-    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
     Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2);
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java 
b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
index 0ad12ff..b3dfd8e 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
@@ -56,7 +56,8 @@ public class TestScheduleDelayJobs extends 
TaskSynchronizedTestBase {
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), 
CLUSTER_NAME);
     long currentTime = System.currentTimeMillis();
     TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
-    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
     Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) - 
currentTime >= 10000L);
   }
 
@@ -80,7 +81,8 @@ public class TestScheduleDelayJobs extends 
TaskSynchronizedTestBase {
     _driver.start(builder.build());
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor(), 
CLUSTER_NAME);
     TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
-    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
+    TaskTestUtil.calculateTaskSchedulingStage(_cache, _manager);
     Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) == 
currentTime);
   }
 

Reply via email to