Repository: helix
Updated Branches:
  refs/heads/master cf010f904 -> 9d7364d7a


Refactor WorkflowRebalancer to WorkflowHandler

Current WorkflowRebalancer is a little bit messing that mixing workflow update 
and scheduling logic together. Refactor WorklfowRebalancer to WorkflowHandler 
which will schedule and update the status of the workflow independent from each 
other.

Also remove the redundant logics in existing pipeline.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0c3ac37b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0c3ac37b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0c3ac37b

Branch: refs/heads/master
Commit: 0c3ac37b0b442f20d08eaba86da7d94ec1494d1f
Parents: cf010f9
Author: Junkai Xue <[email protected]>
Authored: Wed Sep 12 18:31:38 2018 -0700
Committer: Junkai Xue <[email protected]>
Committed: Mon Oct 29 18:05:06 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      |  27 +-
 .../helix/task/AbstractTaskDispatcher.java      | 129 ++++-
 .../org/apache/helix/task/JobRebalancer.java    |   2 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 120 -----
 .../apache/helix/task/WorkflowDispatcher.java   | 526 +++++++++++++++++++
 .../apache/helix/task/WorkflowRebalancer.java   | 510 +-----------------
 .../scripts/integration-test/script/pexpect.py  |  12 +-
 .../task/TestIndependentTaskRebalancer.java     |   2 +-
 .../integration/task/TestRecurringJobQueue.java |  26 +-
 9 files changed, 701 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 5d39512..2c42aca 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -214,14 +214,14 @@ public class TaskDataCache extends AbstractDataCache {
     _contextToUpdate.removeAll(_contextToRemove);
     List<String> contextUpdateNames = new ArrayList<>(_contextToUpdate);
     for (String resourceName : contextUpdateNames) {
-      if (_contextMap.get(resourceName) != null && 
!_contextToRemove.contains(resourceName)) {
+      if (_contextMap.get(resourceName) != null) {
         contextUpdatePaths.add(getContextPath(resourceName));
         contextUpdateData.add(_contextMap.get(resourceName));
       }
     }
 
-    boolean[] updateSuccess = accessor.getBaseDataAccessor()
-        .setChildren(contextUpdatePaths, contextUpdateData, 
AccessOption.PERSISTENT);
+    boolean[] updateSuccess =
+        accessor.getBaseDataAccessor().setChildren(contextUpdatePaths, 
contextUpdateData, AccessOption.PERSISTENT);
 
     for (int i = 0; i < updateSuccess.length; i++) {
       if (updateSuccess[i]) {
@@ -230,18 +230,21 @@ public class TaskDataCache extends AbstractDataCache {
     }
 
     // Delete contexts
-    List<String> contextToRemove = new ArrayList<>();
-    List<String> contextToRemoveNames = new ArrayList<>(_contextToRemove);
-    for (String resourceName : contextToRemoveNames) {
-      contextToRemove.add(getContextPath(resourceName));
+    // We can not leave the context here since some of the deletion happens 
for cleaning workflow
+    // If we leave it in the memory, Helix will not allow user create it with 
same name.
+    // TODO: Let's have periodical clean up thread that could remove deletion 
failed contexts.
+    List<String> contextPathsToRemove = new ArrayList<>();
+    List<String> contextNamesToRemove = new ArrayList<>(_contextToRemove);
+    for (String resourceName : contextNamesToRemove) {
+      contextPathsToRemove.add(getContextPath(resourceName));
     }
 
+    // TODO: current behavior is when you delete non-existing data will return 
false.
+    // Once the behavior fixed, we can add retry logic back. Otherwise, it 
will stay in memory and
+    // not allow same workflow name recreation.
+    accessor.getBaseDataAccessor().remove(contextPathsToRemove, 
AccessOption.PERSISTENT);
 
-    // Current implementation is stateless operation, since Helix read all the 
contexts back
-    // and redo the works. If it is failed to remove this round, it could be 
removed in next round.
-
-    // Also if the context has already been removed, it should be fine.
-    accessor.getBaseDataAccessor().remove(contextToRemove, 
AccessOption.PERSISTENT);
+    _contextToRemove.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index bf33d77..617263b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -1,15 +1,19 @@
 package org.apache.helix.task;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
+import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -412,7 +416,7 @@ public abstract class AbstractTaskDispatcher {
       WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap,
       ClusterDataCache clusterDataCache) {
     markJobFailed(jobName, jobContext, workflowConfig, workflowContext, 
jobConfigMap,
-        clusterDataCache);
+        clusterDataCache.getTaskDataCache());
     // Mark all INIT task to TASK_ABORTED
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
@@ -752,7 +756,7 @@ public abstract class AbstractTaskDispatcher {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.COMPLETED);
     jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, 
clusterDataCache)) {
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, 
clusterDataCache.getTaskDataCache())) {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
@@ -761,7 +765,7 @@ public abstract class AbstractTaskDispatcher {
 
   protected void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
       WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
+      TaskDataCache clusterDataCache) {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.FAILED);
     if (jobContext != null) {
@@ -799,7 +803,7 @@ public abstract class AbstractTaskDispatcher {
    *         returns false otherwise.
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
-      Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
+      Map<String, JobConfig> jobConfigMap, TaskDataCache clusterDataCache) {
     boolean incomplete = false;
 
     TaskState workflowState = ctx.getWorkflowState();
@@ -963,4 +967,121 @@ public abstract class AbstractTaskDispatcher {
     // This is a targeted task
     return pName(jobCfg.getJobId(), partitionNum);
   }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false 
otherwise.
+   */
+  protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) 
{
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState != null
+          && (jobState.equals(TaskState.IN_PROGRESS) || 
jobState.equals(TaskState.STOPPING))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected ResourceAssignment buildEmptyAssignment(String name,
+      CurrentStateOutput currStateOutput) {
+    ResourceAssignment assignment = new ResourceAssignment(name);
+    Set<Partition> partitions = 
currStateOutput.getCurrentStateMappedPartitions(name);
+    for (Partition partition : partitions) {
+      Map<String, String> currentStateMap = 
currStateOutput.getCurrentStateMap(name, partition);
+      Map<String, String> replicaMap = Maps.newHashMap();
+      for (String instanceName : currentStateMap.keySet()) {
+        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
+      }
+      assignment.addReplicaMap(partition, replicaMap);
+    }
+    return assignment;
+  }
+
+  /**
+   * Check all the dependencies of a job to determine whether the job is ready 
to be scheduled.
+   * @param job
+   * @param workflowCfg
+   * @param workflowCtx
+   * @return
+   */
+  protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
+      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap,
+      TaskDataCache clusterDataCache) {
+    int notStartedCount = 0;
+    int failedOrTimeoutCount = 0;
+    int incompleteParentCount = 0;
+
+    for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
+      TaskState jobState = workflowCtx.getJobState(parent);
+      if (jobState == null || jobState == TaskState.NOT_STARTED) {
+        ++notStartedCount;
+      } else if (jobState == TaskState.FAILED || jobState == 
TaskState.TIMED_OUT) {
+        ++failedOrTimeoutCount;
+      } else if (jobState != TaskState.COMPLETED) {
+        incompleteParentCount++;
+      }
+    }
+
+    // If there is any parent job not started, this job should not be scheduled
+    if (notStartedCount > 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Job %s is not ready to start, 
notStartedParent(s)=%d.", job,
+            notStartedCount));
+      }
+      return false;
+    }
+
+    // If there is parent job failed, schedule the job only when ignore 
dependent
+    // job failure enabled
+    JobConfig jobConfig = jobConfigMap.get(job);
+    if (jobConfig == null) {
+      LOG.error(String.format("The job config is missing for job %s", job));
+      return false;
+    }
+    if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
+      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, 
clusterDataCache);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Job %s is not ready to start, 
failedCount(s)=%d.", job,
+            failedOrTimeoutCount));
+      }
+      return false;
+    }
+
+    if (workflowCfg.isJobQueue()) {
+      // If job comes from a JobQueue, it should apply the parallel job logics
+      if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to schedule, 
inCompleteJobs(s)=%d.", job,
+              incompleteAllCount));
+        }
+        return false;
+      }
+    } else {
+      // If this job comes from a generic workflow, job will not be scheduled 
until
+      // all the direct parent jobs finished
+      if (incompleteParentCount > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to start, 
notFinishedParent(s)=%d.", job,
+              incompleteParentCount));
+        }
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Check if a workflow is ready to schedule.
+   * @param workflowCfg the workflow to check
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
+    Date startTime = workflowCfg.getStartTime();
+    // Workflow with non-scheduled config or passed start time is ready to 
schedule.
+    return (startTime == null || startTime.getTime() <= 
System.currentTimeMillis());
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index abc260a..143053e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -112,7 +112,7 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (!TaskUtil.isJobStarted(jobName, workflowCtx) && 
!isJobReadyToSchedule(jobName, workflowCfg,
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        clusterData.getJobConfigMap(), clusterData)) {
+        clusterData.getJobConfigMap(), clusterData.getTaskDataCache())) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2f88e24..e75fa82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -54,127 +54,7 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
   @Override public abstract ResourceAssignment 
computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput);
 
-  /**
-   * Checks if the workflow has been stopped.
-   *
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   *
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false 
otherwise.
-   */
-  protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) 
{
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      TaskState jobState = ctx.getJobState(job);
-      if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || 
jobState.equals(TaskState.STOPPING))) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  protected ResourceAssignment buildEmptyAssignment(String name,
-      CurrentStateOutput currStateOutput) {
-    ResourceAssignment assignment = new ResourceAssignment(name);
-    Set<Partition> partitions = 
currStateOutput.getCurrentStateMappedPartitions(name);
-    for (Partition partition : partitions) {
-      Map<String, String> currentStateMap = 
currStateOutput.getCurrentStateMap(name, partition);
-      Map<String, String> replicaMap = Maps.newHashMap();
-      for (String instanceName : currentStateMap.keySet()) {
-        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
-      }
-      assignment.addReplicaMap(partition, replicaMap);
-    }
-    return assignment;
-  }
-
-  /**
-   * Check all the dependencies of a job to determine whether the job is ready 
to be scheduled.
-   *
-   * @param job
-   * @param workflowCfg
-   * @param workflowCtx
-   *
-   * @return
-   */
-  protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
-      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
-    int notStartedCount = 0;
-    int failedOrTimeoutCount = 0;
-    int incompleteParentCount = 0;
-
-    for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
-      TaskState jobState = workflowCtx.getJobState(parent);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.FAILED || jobState == 
TaskState.TIMED_OUT) {
-        ++failedOrTimeoutCount;
-      } else if (jobState != TaskState.COMPLETED) {
-        incompleteParentCount++;
-      }
-    }
-
-    // If there is any parent job not started, this job should not be scheduled
-    if (notStartedCount > 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, notStartedParent(s)=%d.", 
job, notStartedCount));
-      }
-      return false;
-    }
 
-    // If there is parent job failed, schedule the job only when ignore 
dependent
-    // job failure enabled
-    JobConfig jobConfig = jobConfigMap.get(job);
-    if (jobConfig == null) {
-      LOG.error(String.format("The job config is missing for job %s", job));
-      return false;
-    }
-    if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
-      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, 
clusterDataCache);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, failedCount(s)=%d.", job, 
failedOrTimeoutCount));
-      }
-      return false;
-    }
-
-    if (workflowCfg.isJobQueue()) {
-      // If job comes from a JobQueue, it should apply the parallel job logics
-      if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to schedule, 
inCompleteJobs(s)=%d.", job,
-              incompleteAllCount));
-        }
-        return false;
-      }
-    } else {
-      // If this job comes from a generic workflow, job will not be scheduled 
until
-      // all the direct parent jobs finished
-      if (incompleteParentCount > 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to start, 
notFinishedParent(s)=%d.", job,
-              incompleteParentCount));
-        }
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule.
-   *
-   * @param workflowCfg the workflow to check
-   *
-   * @return true if the workflow is ready for schedule, false if not ready
-   */
-  protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
-    Date startTime = workflowCfg.getStartTime();
-    // Workflow with non-scheduled config or passed start time is ready to 
schedule.
-    return (startTime == null || startTime.getTime() <= 
System.currentTimeMillis());
-  }
 
   @Override public IdealState computeNewIdealState(String resourceName, 
IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
new file mode 100644
index 0000000..db85a14
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -0,0 +1,526 @@
+package org.apache.helix.task;
+
+import com.google.common.collect.Lists;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkflowDispatcher extends AbstractTaskDispatcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowDispatcher.class);
+  private static final Set<TaskState> finalStates = new HashSet<>(
+      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, 
TaskState.TIMED_OUT));
+  private TaskDataCache _taskDataCache;
+
+  public void updateCache(TaskDataCache cache) {
+    _taskDataCache = cache;
+  }
+
+  // Split it into status update and assign. But there are couple of data need
+  // to pass around.
+  public void updateWorkflowStatus(String workflow, WorkflowConfig 
workflowCfg, WorkflowContext workflowCtx) {
+
+    // Fetch workflow configuration and context
+    if (workflowCfg == null) {
+      LOG.warn("Workflow configuration is NULL for " + workflow);
+      return;
+    }
+
+    // Step 1: Check for deletion - if so, we don't need to go through further 
steps
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the 
workflow context.");
+      cleanupWorkflow(workflow);
+      return;
+    }
+
+    // Step 2: handle timeout, which should have higher priority than STOP
+    // Only generic workflow get timeouted and schedule rebalance for timeout. 
Will skip the set if
+    // the workflow already got timeouted. Job Queue will ignore the setup.
+    if (!workflowCfg.isJobQueue() && 
!finalStates.contains(workflowCtx.getWorkflowState())) {
+      // If timeout point has already been passed, it will not be scheduled
+      scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), 
workflowCfg.getTimeout());
+
+      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && 
isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+        workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
+        _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+      }
+
+      // We should not return after setting timeout, as in case the workflow 
is stopped already
+      // marking it timeout will not trigger rebalance pipeline as we are not 
listening on
+      // PropertyStore change, nor will we schedule rebalance for timeout as 
at this point,
+      // workflow is already timed-out. We should let the code proceed and 
wait for schedule
+      // future cleanup work
+    }
+
+    // Step 3: handle workflow that should STOP
+    // For workflows that already reached final states, STOP should not take 
into effect
+    if (!finalStates.contains(workflowCtx.getWorkflowState()) && 
TargetState.STOP.equals(targetState)) {
+      LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+      }
+      return;
+    }
+
+    long currentTime = System.currentTimeMillis();
+
+    // Step 4: Check and process finished workflow context (confusing,
+    // but its inside isWorkflowFinished())
+    // Check if workflow has been finished and mark it if it is. Also update 
cluster status
+    // monitor if provided
+    // Note that COMPLETE and FAILED will be marked in markJobComplete / 
markJobFailed
+    // This is to handle TIMED_OUT only
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && 
isWorkflowFinished(workflowCtx,
+        workflowCfg, _taskDataCache.getJobConfigMap(), _taskDataCache)) {
+      workflowCtx.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowCtx, workflowCfg);
+      _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+    }
+
+    // Step 5: Handle finished workflows
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+      LOG.info("Workflow " + workflow + " is finished.");
+      long expiryTime = workflowCfg.getExpiry();
+      // Check if this workflow has been finished past its expiry.
+      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
+        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up 
the workflow context.");
+        cleanupWorkflow(workflow);
+      } else {
+        // schedule future cleanup work
+        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
+        _rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
+      }
+      return;
+    }
+
+    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+      Set<String> jobWithFinalStates = new 
HashSet<>(workflowCtx.getJobStates().keySet());
+      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
+      if (jobWithFinalStates.size() > 0) {
+        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
+        workflowCtx.removeJobStates(jobWithFinalStates);
+        workflowCtx.removeJobStartTime(jobWithFinalStates);
+      }
+    }
+
+    _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+  }
+
+  public void assignWorkflow(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
+    // Fetch workflow configuration and context
+    if (workflowCfg == null) {
+      // Already logged in status update.
+      return;
+    }
+
+    if (!isWorkflowReadyForSchedule(workflowCfg)) {
+      LOG.info("Workflow " + workflow + " is not ready to schedule");
+      // set the timer to trigger future schedule
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
+          workflowCfg.getStartTime().getTime());
+      return;
+    }
+
+
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, 
workflowCtx, _taskDataCache);
+    if (isReady) {
+      // Schedule jobs from this workflow.
+      scheduleJobs(workflow, workflowCfg, workflowCtx, 
_taskDataCache.getJobConfigMap(), _taskDataCache);
+    } else {
+      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+    }
+    _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+  }
+
+  public WorkflowContext getOrInitializeWorkflowContext(
+      String workflowName, TaskDataCache cache) {
+    WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new 
ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+      workflowCtx.setName(workflowName);
+      LOG.debug("Workflow context is created for " + workflowName);
+    }
+    return workflowCtx;
+  }
+
+  /**
+   * Figure out whether the jobs in the workflow should be run,
+   * and if it's ready, then just schedule it
+   */
+  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
+      TaskDataCache clusterDataCache) {
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    if (scheduleConfig != null && scheduleConfig.isRecurring()) {
+      LOG.debug("Jobs from recurring workflow are not schedule-able");
+      return;
+    }
+
+    int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, 
workflowCtx);
+    int scheduledJobs = 0;
+    long timeToSchedule = Long.MAX_VALUE;
+    for (String job : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = workflowCtx.getJobState(job);
+      if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Job " + job + " is already started or completed.");
+        }
+        continue;
+      }
+
+      if (workflowCfg.isJobQueue() && scheduledJobs >= 
workflowCfg.getParallelJobs()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Workflow %s already have enough job in 
progress, "
+              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, 
scheduledJobs));
+        }
+        break;
+      }
+
+      // check ancestor job status
+      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap,
+          clusterDataCache)) {
+        JobConfig jobConfig = jobConfigMap.get(job);
+        if (jobConfig == null) {
+          LOG.error(String.format("The job config is missing for job %s", 
job));
+          continue;
+        }
+
+        // Since the start time is calculated base on the time of completion 
of parent jobs for this
+        // job, the calculated start time should only be calculate once. 
Persist the calculated time
+        // in WorkflowContext znode.
+        long calculatedStartTime = workflowCtx.getJobStartTime(job);
+        if (calculatedStartTime < 0) {
+          // Calculate the start time if it is not already calculated
+          calculatedStartTime = System.currentTimeMillis();
+          // If the start time is not calculated before, do the math.
+          if (jobConfig.getExecutionDelay() >= 0) {
+            calculatedStartTime += jobConfig.getExecutionDelay();
+          }
+          calculatedStartTime = Math.max(calculatedStartTime, 
jobConfig.getExecutionStart());
+          workflowCtx.setJobStartTime(job, calculatedStartTime);
+        }
+
+        // Time is not ready. Set a trigger and update the start time.
+        if (System.currentTimeMillis() < calculatedStartTime) {
+          timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
+        } else {
+          scheduleSingleJob(job, jobConfig);
+          workflowCtx.setJobState(job, TaskState.NOT_STARTED);
+          scheduledJobs++;
+        }
+      }
+    }
+    long currentScheduledTime =
+        _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
+            : _rebalanceScheduler.getRebalanceTime(workflow);
+    if (timeToSchedule < currentScheduledTime) {
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
timeToSchedule);
+    }
+  }
+
+  /**
+   * Posts new job to cluster
+   */
+  private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
+    HelixAdmin admin = _manager.getClusterManagmentTool();
+
+    IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), 
jobResource);
+    if (jobIS != null) {
+      LOG.info("Job " + jobResource + " idealstate already exists!");
+      return;
+    }
+
+    // Set up job resource based on partitions from target resource
+    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
+        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
+
+    int numPartitions = numIndependentTasks;
+    if (numPartitions == 0) {
+      IdealState targetIs =
+          admin.getResourceIdealState(_manager.getClusterName(), 
jobConfig.getTargetResource());
+      if (targetIs == null) {
+        LOG.warn("Target resource does not exist for job " + jobResource);
+        // do not need to fail here, the job will be marked as failure 
immediately when job starts
+        // running.
+      } else {
+        numPartitions = targetIs.getPartitionSet().size();
+      }
+    }
+
+    admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
+        TaskConstants.STATE_MODEL_NAME);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Set the job configuration
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), 
taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), 
resourceConfig);
+
+    // Push out new ideal state based on number of target partitions
+    IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
+    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
+    builder.setNumReplica(1);
+    builder.setNumPartitions(numPartitions);
+    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+
+    if (jobConfig.getInstanceGroupTag() != null) {
+      builder.setNodeGroup(jobConfig.getInstanceGroupTag());
+    }
+
+    if (jobConfig.isDisableExternalView()) {
+      builder.disableExternalView();
+    }
+
+    jobIS = builder.build();
+    for (int i = 0; i < numPartitions; i++) {
+      jobIS.getRecord().setListField(jobResource + "_" + i, new 
ArrayList<String>());
+      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, 
String>());
+    }
+    jobIS.setRebalancerClassName(JobRebalancer.class.getName());
+    admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
+  }
+
+  /**
+   * Check if a workflow is ready to schedule, and schedule a rebalance if it 
is not
+   * @param workflow the Helix resource associated with the workflow
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig 
workflowCfg,
+      WorkflowContext workflowCtx, TaskDataCache cache) {
+    // non-scheduled workflow is ready to run immediately.
+    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+      return true;
+    }
+
+    // Figure out when this should be run, and if it's ready, then just run it
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    Date startTime = scheduleConfig.getStartTime();
+    long currentTime = new Date().getTime();
+    long delayFromStart = startTime.getTime() - currentTime;
+
+    if (delayFromStart <= 0) {
+      // Recurring workflows are just templates that spawn new workflows
+      if (scheduleConfig.isRecurring()) {
+        // Skip scheduling this workflow if it's not in a start state
+        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip scheduling since the workflow has not been started 
" + workflow);
+          }
+          return false;
+        }
+
+        // Skip scheduling this workflow again if the previous run (if any) is 
still active
+        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+        if (lastScheduled != null) {
+          WorkflowContext lastWorkflowCtx = 
cache.getWorkflowContext(lastScheduled);
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == 
WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed 
yet " + lastScheduled);
+            return false;
+          }
+        }
+
+        // Figure out how many jumps are needed, thus the time to schedule the 
next workflow
+        // The negative of the delay is the amount of time past the start time
+        long period =
+            
scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+        long offsetMultiplier = (-delayFromStart) / period;
+        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+        // Now clone the workflow if this clone has not yet been created
+        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        String newWorkflowName = workflow + "_" + df.format(new 
Date(timeToSchedule));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ready to start workflow " + newWorkflowName);
+        }
+        if (lastScheduled == null || !newWorkflowName.equals(lastScheduled)) {
+          Workflow clonedWf =
+              cloneWorkflow(_manager, workflow, newWorkflowName, new 
Date(timeToSchedule));
+          TaskDriver driver = new TaskDriver(_manager);
+          if (clonedWf != null) {
+            try {
+              // Start the cloned workflow
+              driver.start(clonedWf);
+            } catch (Exception e) {
+              LOG.error("Failed to schedule cloned workflow " + 
newWorkflowName, e);
+              
_clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), 
TaskState.FAILED);
+            }
+          }
+          // Persist workflow start regardless of success to avoid retrying 
and failing
+          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+        }
+
+        // Change the time to trigger the pipeline to that of the next run
+        _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
(timeToSchedule + period));
+      } else {
+        // one time workflow.
+        // Remove any timers that are past-time for this workflowg
+        long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
+        if (scheduledTime > 0 && currentTime > scheduledTime) {
+          _rebalanceScheduler.removeScheduledRebalance(workflow);
+        }
+        return true;
+      }
+    } else {
+      // set the timer to trigger future schedule
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
startTime.getTime());
+    }
+
+    return false;
+  }
+
+  /**
+   * Create a new workflow based on an existing one
+   * @param manager connection to Helix
+   * @param origWorkflowName the name of the existing workflow
+   * @param newWorkflowName the name of the new workflow
+   * @param newStartTime a provided start time that deviates from the desired 
start time
+   * @return the cloned workflow, or null if there was a problem cloning the 
existing one
+   */
+  public static Workflow cloneWorkflow(HelixManager manager, String 
origWorkflowName,
+      String newWorkflowName, Date newStartTime) {
+    // Read all resources, including the workflow and jobs of interest
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    if (!resourceConfigMap.containsKey(origWorkflowName)) {
+      LOG.error("No such workflow named " + origWorkflowName);
+      return null;
+    }
+    if (resourceConfigMap.containsKey(newWorkflowName)) {
+      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      return null;
+    }
+
+    // Create a new workflow with a new name
+    Map<String, String> workflowConfigsMap =
+        resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
+    WorkflowConfig.Builder workflowConfigBlder = 
WorkflowConfig.Builder.fromMap(workflowConfigsMap);
+
+    // Set the schedule, if applicable
+    if (newStartTime != null) {
+      ScheduleConfig scheduleConfig = 
ScheduleConfig.oneTimeDelayedStart(newStartTime);
+      workflowConfigBlder.setScheduleConfig(scheduleConfig);
+    }
+    workflowConfigBlder.setTerminable(true);
+
+    WorkflowConfig workflowConfig = workflowConfigBlder.build();
+
+    JobDag jobDag = workflowConfig.getJobDag();
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
+    workflowBuilder.setWorkflowConfig(workflowConfig);
+
+    // Add each job back as long as the original exists
+    Set<String> namespacedJobs = jobDag.getAllNodes();
+    for (String namespacedJob : namespacedJobs) {
+      if (resourceConfigMap.containsKey(namespacedJob)) {
+        // Copy over job-level and task-level configs
+        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedJob);
+        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+        Map<String, String> jobSimpleFields = 
jobConfig.getRecord().getSimpleFields();
+
+        JobConfig.Builder jobCfgBuilder = 
JobConfig.Builder.fromMap(jobSimpleFields);
+
+        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
+        Map<String, Map<String, String>> rawTaskConfigMap = 
jobConfig.getRecord().getMapFields();
+        List<TaskConfig> taskConfigs = Lists.newLinkedList();
+        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
+          taskConfigs.add(taskConfig);
+        }
+        jobCfgBuilder.addTaskConfigs(taskConfigs);
+        workflowBuilder.addJob(job, jobCfgBuilder);
+
+        // Add dag dependencies
+        Set<String> children = parentsToChildren.get(namespacedJob);
+        if (children != null) {
+          for (String namespacedChild : children) {
+            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedChild);
+            workflowBuilder.addParentChildDependency(job, child);
+          }
+        }
+      }
+    }
+    return workflowBuilder.build();
+  }
+
+  /**
+   * Clean up a workflow. This removes the workflow config, idealstate, 
externalview and workflow
+   * contexts associated with this workflow, and all jobs information, 
including their configs,
+   * context, IS and EV.
+   */
+  private void cleanupWorkflow(String workflow) {
+    LOG.info("Cleaning up workflow: " + workflow);
+    WorkflowConfig workflowcfg = _taskDataCache.getWorkflowConfig(workflow);
+
+    if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == 
TargetState.DELETE) {
+      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+      // Remove all pending timer tasks for this workflow if exists
+      _rebalanceScheduler.removeScheduledRebalance(workflow);
+      for (String job : jobs) {
+        _rebalanceScheduler.removeScheduledRebalance(job);
+      }
+      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
+          _manager.getHelixPropertyStore(), workflow, jobs)) {
+        LOG.warn("Failed to clean up workflow " + workflow);
+      } else {
+        // Only remove from cache when remove all workflow success. Otherwise, 
batch write will
+        // clean all the contexts even if Configs and IdealStates are exists. 
Then all the workflows
+        // and jobs will rescheduled again.
+        removeContexts(workflow, jobs, _taskDataCache);
+      }
+     } else {
+      LOG.info("Did not clean up workflow " + workflow
+          + " because neither the workflow is non-terminable nor is set to 
DELETE.");
+    }
+  }
+
+  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache 
cache) {
+    if (jobs != null) {
+      for (String job : jobs) {
+        cache.removeContext(job);
+      }
+    }
+    cache.removeContext(workflow);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6851475..c913131 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,531 +19,43 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 /**
  * Custom rebalancer implementation for the {@code Workflow} in task state 
model.
  */
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowRebalancer.class);
-  private static final Set<TaskState> finalStates = new HashSet<>(
-      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, 
TaskState.TIMED_OUT));
+  private WorkflowDispatcher _workflowDispatcher = new WorkflowDispatcher();
 
   @Override
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache 
clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput) {
     final String workflow = resource.getResourceName();
+    long startTime = System.currentTimeMillis();
     LOG.debug("Computer Best Partition for workflow: " + workflow);
-
-    // Fetch workflow configuration and context
+    _workflowDispatcher.init(_manager);
+    WorkflowContext workflowCtx = _workflowDispatcher
+        .getOrInitializeWorkflowContext(workflow, 
clusterData.getTaskDataCache());
     WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflow);
-    if (workflowCfg == null) {
-      LOG.warn("Workflow configuration is NULL for " + workflow);
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    WorkflowContext workflowCtx = getOrInitializeWorkflowContext(clusterData, 
workflow);
-
-    // Step 1: Check for deletion - if so, we don't need to go through further 
steps
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the 
workflow context.");
-      cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    // Step 2: handle timeout, which should have higher priority than STOP
-    // Only generic workflow get timeouted and schedule rebalance for timeout. 
Will skip the set if
-    // the workflow already got timeouted. Job Queue will ignore the setup.
-    if (!workflowCfg.isJobQueue() && 
!finalStates.contains(workflowCtx.getWorkflowState())) {
-      // If timeout point has already been passed, it will not be scheduled
-      scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), 
workflowCfg.getTimeout());
-
-      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
-          && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
-        workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
-        clusterData.updateWorkflowContext(workflow, workflowCtx);
-      }
-
-      // We should not return after setting timeout, as in case the workflow 
is stopped already
-      // marking it timeout will not trigger rebalance pipeline as we are not 
listening on
-      // PropertyStore change, nor will we schedule rebalance for timeout as 
at this point,
-      // workflow is already timed-out. We should let the code proceed and 
wait for schedule
-      // future cleanup work
-    }
 
-    // Step 3: handle workflow that should STOP
-    // For workflows that already reached final states, STOP should not take 
into effect
-    if (!finalStates.contains(workflowCtx.getWorkflowState())
-        && TargetState.STOP.equals(targetState)) {
-      LOG.info("Workflow " + workflow + "is marked as stopped.");
-      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-        clusterData.updateWorkflowContext(workflow, workflowCtx);
-      }
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
+    _workflowDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
+    _workflowDispatcher.updateCache(clusterData.getTaskDataCache());
+    _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, 
workflowCtx);
+    _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx);
 
-    long currentTime = System.currentTimeMillis();
-
-    // Step 4: Check and process finished workflow context (confusing,
-    // but its inside isWorkflowFinished())
-    // Check if workflow has been finished and mark it if it is. Also update 
cluster status
-    // monitor if provided
-    // Note that COMPLETE and FAILED will be marked in markJobComplete / 
markJobFailed
-    // This is to handle TIMED_OUT only
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && 
isWorkflowFinished(workflowCtx,
-        workflowCfg, clusterData.getJobConfigMap(), clusterData)) {
-      workflowCtx.setFinishTime(currentTime);
-      updateWorkflowMonitor(workflowCtx, workflowCfg);
-      clusterData.updateWorkflowContext(workflow, workflowCtx);
-    }
-
-    // Step 5: Handle finished workflows
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow " + workflow + " is finished.");
-      long expiryTime = workflowCfg.getExpiry();
-      // Check if this workflow has been finished past its expiry.
-      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
-        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up 
the workflow context.");
-        cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
-      } else {
-        // schedule future cleanup work
-        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
-        _rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
-      }
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Workflow " + workflow + " is not ready to schedule");
-      // set the timer to trigger future schedule
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
-          workflowCfg.getStartTime().getTime());
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, 
workflowCtx, clusterData);
-    if (isReady) {
-      // Schedule jobs from this workflow.
-      scheduleJobs(workflow, workflowCfg, workflowCtx, 
clusterData.getJobConfigMap(), clusterData);
-    } else {
-      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
-    }
-
-    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
-      Set<String> jobWithFinalStates = new 
HashSet<>(workflowCtx.getJobStates().keySet());
-      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
-      if (jobWithFinalStates.size() > 0) {
-        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
-        workflowCtx.removeJobStates(jobWithFinalStates);
-        workflowCtx.removeJobStartTime(jobWithFinalStates);
-      }
-    }
-
-    clusterData.updateWorkflowContext(workflow, workflowCtx);
+    LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for 
workflow %s",
+        System.currentTimeMillis() - startTime, workflow));
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
-  private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache 
clusterData,
-      String workflowName) {
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName);
-    if (workflowCtx == null) {
-      WorkflowConfig config = clusterData.getWorkflowConfig(workflowName);
-      workflowCtx = new WorkflowContext(new 
ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflowName);
-      LOG.debug("Workflow context is created for " + workflowName);
-    }
-    return workflowCtx;
-  }
-
-  /**
-   * Figure out whether the jobs in the workflow should be run,
-   * and if it's ready, then just schedule it
-   */
-  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    if (scheduleConfig != null && scheduleConfig.isRecurring()) {
-      LOG.debug("Jobs from recurring workflow are not schedule-able");
-      return;
-    }
-
-    int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, 
workflowCtx);
-    int scheduledJobs = 0;
-    long timeToSchedule = Long.MAX_VALUE;
-    for (String job : workflowCfg.getJobDag().getAllNodes()) {
-      TaskState jobState = workflowCtx.getJobState(job);
-      if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Job " + job + " is already started or completed.");
-        }
-        continue;
-      }
-
-      if (workflowCfg.isJobQueue() && scheduledJobs >= 
workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Workflow %s already have enough job in 
progress, "
-              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, 
scheduledJobs));
-        }
-        break;
-      }
-
-      // check ancestor job status
-      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap,
-          clusterDataCache)) {
-        JobConfig jobConfig = jobConfigMap.get(job);
-        if (jobConfig == null) {
-          LOG.error(String.format("The job config is missing for job %s", 
job));
-          continue;
-        }
-
-        // Since the start time is calculated base on the time of completion 
of parent jobs for this
-        // job, the calculated start time should only be calculate once. 
Persist the calculated time
-        // in WorkflowContext znode.
-        long calculatedStartTime = workflowCtx.getJobStartTime(job);
-        if (calculatedStartTime < 0) {
-          // Calculate the start time if it is not already calculated
-          calculatedStartTime = System.currentTimeMillis();
-          // If the start time is not calculated before, do the math.
-          if (jobConfig.getExecutionDelay() >= 0) {
-            calculatedStartTime += jobConfig.getExecutionDelay();
-          }
-          calculatedStartTime = Math.max(calculatedStartTime, 
jobConfig.getExecutionStart());
-          workflowCtx.setJobStartTime(job, calculatedStartTime);
-        }
-
-        // Time is not ready. Set a trigger and update the start time.
-        if (System.currentTimeMillis() < calculatedStartTime) {
-          timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
-        } else {
-          scheduleSingleJob(job, jobConfig);
-          workflowCtx.setJobState(job, TaskState.NOT_STARTED);
-          scheduledJobs++;
-        }
-      }
-    }
-    long currentScheduledTime =
-        _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
-            : _rebalanceScheduler.getRebalanceTime(workflow);
-    if (timeToSchedule < currentScheduledTime) {
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
timeToSchedule);
-    }
-  }
-
-  /**
-   * Posts new job to cluster
-   */
-  private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
-    HelixAdmin admin = _manager.getClusterManagmentTool();
-
-    IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), 
jobResource);
-    if (jobIS != null) {
-      LOG.info("Job " + jobResource + " idealstate already exists!");
-      return;
-    }
-
-    // Set up job resource based on partitions from target resource
-    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
-        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
-    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
-
-    int numPartitions = numIndependentTasks;
-    if (numPartitions == 0) {
-      IdealState targetIs =
-          admin.getResourceIdealState(_manager.getClusterName(), 
jobConfig.getTargetResource());
-      if (targetIs == null) {
-        LOG.warn("Target resource does not exist for job " + jobResource);
-        // do not need to fail here, the job will be marked as failure 
immediately when job starts
-        // running.
-      } else {
-        numPartitions = targetIs.getPartitionSet().size();
-      }
-    }
-
-    admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
-        TaskConstants.STATE_MODEL_NAME);
-
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Set the job configuration
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    HelixProperty resourceConfig = new HelixProperty(jobResource);
-    
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null) {
-      for (TaskConfig taskConfig : taskConfigMap.values()) {
-        resourceConfig.getRecord().setMapField(taskConfig.getId(), 
taskConfig.getConfigMap());
-      }
-    }
-    accessor.setProperty(keyBuilder.resourceConfig(jobResource), 
resourceConfig);
-
-    // Push out new ideal state based on number of target partitions
-    IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
-    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
-    builder.setNumReplica(1);
-    builder.setNumPartitions(numPartitions);
-    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
-
-    if (jobConfig.getInstanceGroupTag() != null) {
-      builder.setNodeGroup(jobConfig.getInstanceGroupTag());
-    }
-
-    if (jobConfig.isDisableExternalView()) {
-      builder.disableExternalView();
-    }
-
-    jobIS = builder.build();
-    for (int i = 0; i < numPartitions; i++) {
-      jobIS.getRecord().setListField(jobResource + "_" + i, new 
ArrayList<String>());
-      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, 
String>());
-    }
-    jobIS.setRebalancerClassName(JobRebalancer.class.getName());
-    admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it 
is not
-   * @param workflow the Helix resource associated with the workflow
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @return true if the workflow is ready for schedule, false if not ready
-   */
-  private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig 
workflowCfg,
-      WorkflowContext workflowCtx, ClusterDataCache cache) {
-    // non-scheduled workflow is ready to run immediately.
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skip scheduling since the workflow has not been started 
" + workflow);
-          }
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is 
still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = 
cache.getWorkflowContext(lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == 
WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed 
yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the 
next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            
scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        String newWorkflowName = workflow + "_" + df.format(new 
Date(timeToSchedule));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ready to start workflow " + newWorkflowName);
-        }
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              cloneWorkflow(_manager, workflow, newWorkflowName, new 
Date(timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          if (clonedWf != null) {
-            try {
-              // Start the cloned workflow
-              driver.start(clonedWf);
-            } catch (Exception e) {
-              LOG.error("Failed to schedule cloned workflow " + 
newWorkflowName, e);
-              
_clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), 
TaskState.FAILED);
-            }
-          }
-          // Persist workflow start regardless of success to avoid retrying 
and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
(timeToSchedule + period));
-      } else {
-        // one time workflow.
-        // Remove any timers that are past-time for this workflowg
-        long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
-        if (scheduledTime > 0 && currentTime > scheduledTime) {
-          _rebalanceScheduler.removeScheduledRebalance(workflow);
-        }
-        return true;
-      }
-    } else {
-      // set the timer to trigger future schedule
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
startTime.getTime());
-    }
-
-    return false;
-  }
-
-  /**
-   * Create a new workflow based on an existing one
-   * @param manager connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired 
start time
-   * @return the cloned workflow, or null if there was a problem cloning the 
existing one
-   */
-  public static Workflow cloneWorkflow(HelixManager manager, String 
origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
 
-    // Create a new workflow with a new name
-    Map<String, String> workflowConfigsMap =
-        resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
-    WorkflowConfig.Builder workflowConfigBlder = 
WorkflowConfig.Builder.fromMap(workflowConfigsMap);
-
-    // Set the schedule, if applicable
-    if (newStartTime != null) {
-      ScheduleConfig scheduleConfig = 
ScheduleConfig.oneTimeDelayedStart(newStartTime);
-      workflowConfigBlder.setScheduleConfig(scheduleConfig);
-    }
-    workflowConfigBlder.setTerminable(true);
-
-    WorkflowConfig workflowConfig = workflowConfigBlder.build();
-
-    JobDag jobDag = workflowConfig.getJobDag();
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-
-    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
-    workflowBuilder.setWorkflowConfig(workflowConfig);
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = 
jobConfig.getRecord().getSimpleFields();
-
-        JobConfig.Builder jobCfgBuilder = 
JobConfig.Builder.fromMap(jobSimpleFields);
-
-        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
-        Map<String, Map<String, String>> rawTaskConfigMap = 
jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        jobCfgBuilder.addTaskConfigs(taskConfigs);
-        workflowBuilder.addJob(job, jobCfgBuilder);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedChild);
-            workflowBuilder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return workflowBuilder.build();
-  }
-
-  /**
-   * Clean up a workflow. This removes the workflow config, idealstate, 
externalview and workflow
-   * contexts associated with this workflow, and all jobs information, 
including their configs,
-   * context, IS and EV.
-   */
-  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
-      TaskDataCache taskDataCache) {
-    LOG.info("Cleaning up workflow: " + workflow);
-
-    if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == 
TargetState.DELETE) {
-      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
-      // Remove all pending timer tasks for this workflow if exists
-      _rebalanceScheduler.removeScheduledRebalance(workflow);
-      for (String job : jobs) {
-        _rebalanceScheduler.removeScheduledRebalance(job);
-      }
-      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
-          _manager.getHelixPropertyStore(), workflow, jobs)) {
-        LOG.warn("Failed to clean up workflow " + workflow);
-      } else {
-        // Only remove from cache when remove all workflow success. Otherwise, 
batch write will
-        // clean all the contexts even if Configs and IdealStates are exists. 
Then all the workflows
-        // and jobs will rescheduled again.
-        removeContexts(workflow, jobs, taskDataCache);
-      }
-    } else {
-      LOG.info("Did not clean up workflow " + workflow
-          + " because neither the workflow is non-terminable nor is set to 
DELETE.");
-    }
-  }
-
-  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache 
cache) {
-    if (jobs != null) {
-      for (String job : jobs) {
-        cache.removeContext(job);
-      }
-    }
-    cache.removeContext(workflow);
-  }
 
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/scripts/integration-test/script/pexpect.py
----------------------------------------------------------------------
diff --git a/helix-core/src/main/scripts/integration-test/script/pexpect.py 
b/helix-core/src/main/scripts/integration-test/script/pexpect.py
index 6516cda..66c2d3c 100644
--- a/helix-core/src/main/scripts/integration-test/script/pexpect.py
+++ b/helix-core/src/main/scripts/integration-test/script/pexpect.py
@@ -355,12 +355,12 @@ class spawn (object):
         the input from the child and output sent to the child. Sometimes you
         don't want to see everything you write to the child. You only want to
         log what the child sends back. For example::
-        
+
             child = pexpect.spawn('some_command')
             child.logfile_read = sys.stdout
 
         To separately log output sent to the child use logfile_send::
-        
+
             self.logfile_send = fout
 
         The delaybeforesend helps overcome a weird behavior that many users
@@ -723,7 +723,7 @@ class spawn (object):
         if timeout == -1:
             timeout = self.timeout
         if timeout is not None:
-            end_time = time.time() + timeout 
+            end_time = time.time() + timeout
         while True:
             if not self.getecho():
                 return True
@@ -1374,7 +1374,7 @@ class spawn (object):
         if timeout == -1:
             timeout = self.timeout
         if timeout is not None:
-            end_time = time.time() + timeout 
+            end_time = time.time() + timeout
         if searchwindowsize == -1:
             searchwindowsize = self.searchwindowsize
 
@@ -1672,7 +1672,7 @@ class searcher_string (object):
         # rescanning until we've read three more bytes.
         #
         # Sadly, I don't know enough about this interesting topic. /grahn
-        
+
         for index, s in self._strings:
             if searchwindowsize is None:
                 # the match, if any, can only be in the fresh data,
@@ -1751,7 +1751,7 @@ class searcher_re (object):
         'buffer' which have not been searched before.
 
         See class spawn for the 'searchwindowsize' argument.
-        
+
         If there is a match this returns the index of that string, and sets
         'start', 'end' and 'match'. Otherwise, returns -1."""
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 431b929..7495078 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -240,7 +240,7 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
     // Check that the workflow only started after the start time (with a 1 
second buffer)
     WorkflowContext workflowCtx = _driver.getWorkflowContext(jobName);
     long startTime = workflowCtx.getStartTime();
-    Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
+    Assert.assertTrue(startTime <= inFiveSeconds);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 361a672..5eba70a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -50,7 +50,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuild = 
TaskTestUtil.buildRecurrentJobQueue(queueName);
     List<String> currentJobNames = createAndEnqueueJob(queueBuild, 2);
-
+    queueBuild.setExpiry(1);
     _driver.start(queueBuild.build());
 
     WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, 
queueName);
@@ -61,20 +61,21 @@ public class TestRecurringJobQueue extends TaskTestBase {
     _driver.pollForJobState(scheduledQueue, namedSpaceJob1, 
TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
-    _driver.delete(queueName);
-    Thread.sleep(500);
+    _driver.deleteAndWaitForCompletion(queueName, 5000);
 
     JobQueue.Builder queueBuilder = 
TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     currentJobNames = createAndEnqueueJob(queueBuilder, 2);
 
-    _driver.createQueue(queueBuilder.build());
-
-
-    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    _driver.start(queueBuilder.build());
 
     // ensure jobs are started and completed
-    scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    scheduledQueue = null;
+    while (scheduledQueue == null) {
+      wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    }
+
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, 
currentJobNames.get(0));
     _driver.pollForJobState(scheduledQueue, namedSpaceJob1, 
TaskState.COMPLETED);
 
@@ -97,8 +98,13 @@ public class TestRecurringJobQueue extends TaskTestBase {
     List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5);
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, 
queueName);
-    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    WorkflowContext wCtx = null;
+    String scheduledQueue = null;
+
+    while (scheduledQueue == null) {
+      wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    }
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);

Reply via email to