Repository: helix
Updated Branches:
  refs/heads/master 442cd096d -> 84c2feabb


Create AbstractTaskDispatcher for future Task Framework

Refactor existing code logic that move future needed logic to 
AbstractTaskDispatcher.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/84c2feab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/84c2feab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/84c2feab

Branch: refs/heads/master
Commit: 84c2feabbc391e4069717d2a3c5b8c4adf32d946
Parents: 442cd09
Author: Junkai Xue <[email protected]>
Authored: Fri May 11 11:44:11 2018 -0700
Committer: Junkai Xue <[email protected]>
Committed: Mon Jul 9 15:26:25 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      | 677 +++++++++++++++++++
 .../org/apache/helix/task/JobRebalancer.java    | 654 +++---------------
 .../org/apache/helix/task/TaskRebalancer.java   | 175 +----
 .../java/org/apache/helix/task/TaskUtil.java    |  63 +-
 .../apache/helix/task/WorkflowRebalancer.java   |   2 +-
 5 files changed, 827 insertions(+), 744 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
new file mode 100644
index 0000000..b447cf3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -0,0 +1,677 @@
+package org.apache.helix.task;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTaskDispatcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTaskDispatcher.class);
+
+  // For connection management
+  protected HelixManager _manager;
+  protected static RebalanceScheduler _rebalanceScheduler = new 
RebalanceScheduler();
+  protected ClusterStatusMonitor _clusterStatusMonitor;
+
+  public void init(HelixManager manager) {
+    _manager = manager;
+  }
+
+  // Job Update related methods
+
+  public void updatePreviousAssignedTasksStatus(
+      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
+      String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx, JobConfig jobCfg,
+      ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
+      Set<Integer> assignedPartitions, Set<Integer> partitionsToDropFromIs,
+      Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
+      Set<Integer> skippedPartitions) {
+    // Iterate through all instances
+    for (String instance : prevInstanceToTaskAssignments.keySet()) {
+      if (excludedInstances.contains(instance)) {
+        continue;
+      }
+
+      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<>();
+      for (int pId : pSet) {
+        final String pName = pName(jobResource, pId);
+        TaskPartitionState currState = 
updateJobContextAndGetTaskCurrentState(currStateOutput, jobResource, pId, pName,
+                instance, jobCtx);
+
+        // Check for pending state transitions on this (partition, instance).
+        Message pendingMessage =
+            currStateOutput.getPendingState(jobResource, new Partition(pName), 
instance);
+        if (pendingMessage != null && 
!pendingMessage.getToState().equals(currState.name())) {
+          processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, 
pId, pName, instance,
+              pendingMessage, jobState, currState, paMap, assignedPartitions);
+          continue;
+        }
+
+        // Process any requested state transitions.
+        String requestedStateStr =
+            currStateOutput.getRequestedState(jobResource, new 
Partition(pName), instance);
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+          TaskPartitionState requestedState = 
TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for 
instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Instance %s requested a state transition to %s for partition 
%s.", instance,
+                requestedState, pName));
+          }
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING: {
+          TaskPartitionState nextState = TaskPartitionState.RUNNING;
+          if (jobState.equals(TaskState.TIMING_OUT)) {
+            nextState = TaskPartitionState.TASK_ABORTED;
+          } else if (jobTgtState.equals(TargetState.STOP)) {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
+                    instance));
+          }
+        }
+        break;
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (jobTgtState.equals(TargetState.START)) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, 
nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String
+                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
+                    instance));
+          }
+        }
+        break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the 
context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
+                pName, currState));
+          }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+        }
+        break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case TASK_ABORTED:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
+                pName, currState, jobCtx.getPartitionInfo(pId)));
+          }
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition 
fails for a specified
+          // maximum number of attempts or task is in ABORTED state.
+          // But notice that if job is TIMED_OUT, aborted task won't be 
treated as fail and won't cause job fail.
+          // After all tasks are aborted, they will be dropped, because of job 
timeout.
+          if (jobState != TaskState.TIMED_OUT && jobState != 
TaskState.TIMING_OUT) {
+            if (jobCtx.getPartitionNumAttempts(pId) >= 
jobCfg.getMaxAttemptsPerTask()
+                || currState.equals(TaskPartitionState.TASK_ABORTED)
+                || currState.equals(TaskPartitionState.ERROR)) {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("skippedPartitions:" + skippedPartitions);
+              }
+            } else {
+              // Mark the task to be started at some later time (if enabled)
+              markPartitionDelayed(jobCfg, jobCtx, pId);
+            }
+          }
+        }
+        break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
+                pName, currState));
+          }
+        }
+        break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the 
error states.
+      pSet.removeAll(donePartitions);
+    }
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  protected String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  protected static class PartitionAssignment {
+    public final String _instance;
+    public final String _state;
+
+    PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
+
+  private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(CurrentStateOutput currentStateOutput,
+      String jobResource, Integer pId, String pName, String instance, 
JobContext jobCtx) {
+    String currentStateString = 
currentStateOutput.getCurrentState(jobResource, new Partition(
+        pName), instance);
+    if (currentStateString == null) {
+      // Task state is either DROPPED or INIT
+      return jobCtx.getPartitionState(pId);
+    }
+    TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    jobCtx.setPartitionState(pId, currentState);
+    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(
+        pName), instance);
+    if (taskMsg != null) {
+      jobCtx.setPartitionInfo(pId, taskMsg);
+    }
+    return currentState;
+  }
+
+  private void processTaskWithPendingMessage(ResourceAssignment 
prevAssignment, Integer pId, String pName,
+      String instance, Message pendingMessage, TaskState jobState, 
TaskPartitionState currState,
+      Map<Integer, PartitionAssignment> paMap, Set<Integer> 
assignedPartitions) {
+
+    Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
+    if (stateMap != null) {
+      String prevState = stateMap.get(instance);
+      if (!pendingMessage.getToState().equals(prevState)) {
+        LOG.warn(String.format("Task pending to-state is %s while previous 
assigned state is %s. This should not"
+            + "happen.", pendingMessage.getToState(), prevState));
+      }
+      if (jobState == TaskState.TIMING_OUT
+          && currState == TaskPartitionState.INIT
+          && prevState.equals(TaskPartitionState.RUNNING.name())) {
+        // While job is timing out, if the task is pending on INIT->RUNNING, 
set it back to INIT,
+        // so that Helix will cancel the transition.
+        paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.INIT.name()));
+        assignedPartitions.add(pId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format(
+              "Task partition %s has a pending state transition on instance %s 
INIT->RUNNING. "
+                  + "Setting it back to INIT so that Helix can cancel the 
transition(if enabled).",
+              pName, instance, prevState));
+        }
+      } else {
+        // Otherwise, Just copy forward
+        // the state assignment from the previous ideal state.
+        paMap.put(pId, new PartitionAssignment(instance, prevState));
+        assignedPartitions.add(pId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format(
+              "Task partition %s has a pending state transition on instance 
%s. Using the previous ideal state which was %s.",
+              pName, instance, prevState));
+        }
+      }
+    }
+  }
+
+  protected static void markPartitionCompleted(JobContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  protected static void markPartitionError(JobContext ctx, int pId, 
TaskPartitionState state,
+      boolean incrementAttempts) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    if (incrementAttempts) {
+      ctx.incrementNumAttempts(pId);
+    }
+  }
+
+  protected static void markAllPartitionsError(JobContext ctx, 
TaskPartitionState state,
+      boolean incrementAttempts) {
+    for (int pId : ctx.getPartitionSet()) {
+      markPartitionError(ctx, pId, state, incrementAttempts);
+    }
+  }
+
+  protected static void markPartitionDelayed(JobConfig cfg, JobContext ctx, 
int p) {
+    long delayInterval = cfg.getTaskRetryDelay();
+    if (delayInterval <= 0) {
+      return;
+    }
+    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+    ctx.setNextRetryTime(p, nextStartTime);
+  }
+
+  protected void handleJobTimeout(JobContext jobCtx, WorkflowContext 
workflowCtx,
+      String jobResource, JobConfig jobCfg) {
+    jobCtx.setFinishTime(System.currentTimeMillis());
+    workflowCtx.setJobState(jobResource, TaskState.TIMED_OUT);
+    // Mark all INIT task to TASK_ABORTED
+    for (int pId : jobCtx.getPartitionSet()) {
+      if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT) {
+        jobCtx.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
+      }
+    }
+    _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT);
+    _rebalanceScheduler.removeScheduledRebalance(jobResource);
+    TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobResource);
+  }
+
+  protected void failJob(String jobName, WorkflowContext workflowContext, 
JobContext jobContext,
+      WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap) {
+    markJobFailed(jobName, jobContext, workflowConfig, workflowContext, 
jobConfigMap);
+    // Mark all INIT task to TASK_ABORTED
+    for (int pId : jobContext.getPartitionSet()) {
+      if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
+        jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
+      }
+    }
+    _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), 
TaskState.FAILED);
+    _rebalanceScheduler.removeScheduledRebalance(jobName);
+    TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobName);
+  }
+
+  // Compute real assignment from theoretical calculation with applied 
throttling or other logics
+  protected void handleAdditionalTaskAssignment(
+      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
+      String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx, JobConfig jobCfg,
+      WorkflowConfig workflowConfig, WorkflowContext workflowCtx, 
ClusterDataCache cache,
+      ResourceAssignment prevTaskToInstanceStateAssignment, Set<Integer> 
assignedPartitions,
+      Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions,
+      TaskAssignmentCalculator taskAssignmentCal, Set<Integer> allPartitions, 
long currentTime,
+      Collection<String> liveInstances) {
+    // The excludeSet contains the set of task partitions that must be 
excluded from consideration
+    // when making any new assignments.
+    // This includes all completed, failed, delayed, and already assigned 
partitions.
+    Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+    addCompletedTasks(excludeSet, jobCtx, allPartitions);
+    addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+    excludeSet.addAll(skippedPartitions);
+    excludeSet.addAll(TaskUtil.getNonReadyPartitions(jobCtx, currentTime));
+    // Get instance->[partition, ...] mappings for the target resource.
+    Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
+        .getTaskAssignment(currStateOutput, prevTaskToInstanceStateAssignment, 
liveInstances,
+            jobCfg, jobCtx, workflowConfig, workflowCtx, allPartitions, 
cache.getIdealStates());
+
+    if (!TaskUtil.isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) 
{
+      dropRebalancedRunningTasks(tgtPartitionAssignments, 
prevInstanceToTaskAssignments, paMap,
+          jobCtx);
+    }
+
+    for (Map.Entry<String, SortedSet<Integer>> entry : 
prevInstanceToTaskAssignments.entrySet()) {
+      String instance = entry.getKey();
+      if (!tgtPartitionAssignments.containsKey(instance) || 
excludedInstances.contains(instance)) {
+        continue;
+      }
+      // 1. throttled by job configuration
+      // Contains the set of task partitions currently assigned to the 
instance.
+      Set<Integer> pSet = entry.getValue();
+      int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - 
pSet.size();
+      // 2. throttled by participant capacity
+      int participantCapacity = 
cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
+      if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
+        participantCapacity = 
cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
+      }
+      int participantLimitation =
+          participantCapacity - cache.getParticipantActiveTaskCount(instance);
+      // New tasks to be assigned
+      int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Throttle tasks to be assigned to instance %s using limitation: 
Job Concurrent Task(%d), "
+                + "Participant Max Task(%d). Remaining capacity %d.", 
instance, jobCfgLimitation,
+            participantCapacity, numToAssign));
+      }
+      if (numToAssign > 0) {
+        Set<Integer> throttledSet = new HashSet<Integer>();
+        List<Integer> nextPartitions =
+            getNextPartitions(tgtPartitionAssignments.get(instance), 
excludeSet, throttledSet,
+                numToAssign);
+        for (Integer pId : nextPartitions) {
+          String pName = pName(jobResource, pId);
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
+          excludeSet.add(pId);
+          jobCtx.setAssignedParticipant(pId, instance);
+          jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+          jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
+                TaskPartitionState.RUNNING, instance));
+          }
+        }
+        cache.setParticipantActiveTaskCount(instance,
+            cache.getParticipantActiveTaskCount(instance) + 
nextPartitions.size());
+        if (!throttledSet.isEmpty()) {
+          LOG.debug(
+              throttledSet.size() + "tasks are ready but throttled when 
assigned to participant.");
+        }
+      }
+    }
+  }
+
+  protected void scheduleForNextTask(String job, JobContext jobCtx, long now) {
+    // Figure out the earliest schedulable time in the future of a 
non-complete job
+    boolean shouldSchedule = false;
+    long earliestTime = Long.MAX_VALUE;
+    for (int p : jobCtx.getPartitionSet()) {
+      long retryTime = jobCtx.getNextRetryTime(p);
+      TaskPartitionState state = jobCtx.getPartitionState(p);
+      state = (state != null) ? state : TaskPartitionState.INIT;
+      Set<TaskPartitionState> errorStates =
+          Sets.newHashSet(TaskPartitionState.ERROR, 
TaskPartitionState.TASK_ERROR,
+              TaskPartitionState.TIMED_OUT);
+      if (errorStates.contains(state) && retryTime > now && retryTime < 
earliestTime) {
+        earliestTime = retryTime;
+        shouldSchedule = true;
+      }
+    }
+
+    // If any was found, then schedule it
+    if (shouldSchedule) {
+      long scheduledTime = _rebalanceScheduler.getRebalanceTime(job);
+      if (scheduledTime == -1 || earliestTime < scheduledTime) {
+        _rebalanceScheduler.scheduleRebalance(_manager, job, earliestTime);
+      }
+    }
+  }
+
+
+
+  // add all partitions that have been tried maxNumberAttempts
+  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx, 
Iterable<Integer> pIds,
+      JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (isTaskGivenup(ctx, cfg, pId)) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> 
candidatePartitions,
+      Set<Integer> excluded, Set<Integer> throttled, int n) {
+    List<Integer> result = new ArrayList<Integer>();
+    for (Integer pId : candidatePartitions) {
+      if (!excluded.contains(pId)) {
+        if (result.size() < n) {
+          result.add(pId);
+        } else {
+          throttled.add(pId);
+        }
+      }
+    }
+    return result;
+  }
+
+  private static void addCompletedTasks(Set<Integer> set, JobContext ctx, 
Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
+      }
+    }
+  }
+
+  protected static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int 
pId) {
+    TaskPartitionState state = ctx.getPartitionState(pId);
+    if (state == TaskPartitionState.TASK_ABORTED || state == 
TaskPartitionState.ERROR) {
+      return true;
+    }
+    if (state == TaskPartitionState.TIMED_OUT || state == 
TaskPartitionState.TASK_ERROR) {
+      return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
+    }
+    return false;
+  }
+
+
+  /**
+   * If assignment is different from previous assignment, drop the old running 
task if it's no
+   * longer assigned to the same instance, but not removing it from excludeSet 
because the same task
+   * should not be assigned to the new instance right away.
+   */
+  private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> 
newAssignment,
+      Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, 
PartitionAssignment> paMap,
+      JobContext jobContext) {
+    for (String instance : oldAssignment.keySet()) {
+      for (Integer pId : oldAssignment.get(instance)) {
+        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING && 
!newAssignment
+            .get(instance).contains(pId)) {
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
+          jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
+        }
+      }
+    }
+  }
+
+  protected void markJobComplete(String jobName, JobContext jobContext,
+      WorkflowConfig workflowConfig, WorkflowContext workflowContext,
+      Map<String, JobConfig> jobConfigMap) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.COMPLETED);
+    jobContext.setFinishTime(currentTime);
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
+      workflowContext.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowContext, workflowConfig);
+    }
+    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+  }
+
+  protected void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.FAILED);
+    if (jobContext != null) {
+      jobContext.setFinishTime(currentTime);
+    }
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
+      workflowContext.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowContext, workflowConfig);
+    }
+    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+  }
+
+  protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig 
workflowConfig,
+      long currentTime) {
+    long currentScheduledTime =
+        _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) 
== -1
+            ? Long.MAX_VALUE
+            : 
_rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
+    if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
+      _rebalanceScheduler.scheduleRebalance(_manager, 
workflowConfig.getWorkflowId(),
+          currentTime + jobConfig.getExpiry());
+    }
+  }
+
+
+
+
+  // Workflow related methods
+
+  /**
+   * Checks if the workflow has finished (either completed or failed).
+   * Set the state in workflow context properly.
+   *
+   * @param ctx Workflow context containing job states
+   * @param cfg Workflow config containing set of jobs
+   * @return returns true if the workflow
+   *            1. completed (all tasks are {@link TaskState#COMPLETED})
+   *            2. failed (any task is {@link TaskState#FAILED}
+   *            3. workflow is {@link TaskState#TIMED_OUT}
+   *         returns false otherwise.
+   */
+  protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
+      Map<String, JobConfig> jobConfigMap) {
+    boolean incomplete = false;
+
+    TaskState workflowState = ctx.getWorkflowState();
+    if (TaskState.TIMED_OUT.equals(workflowState)) {
+      // We don't update job state here as JobRebalancer will do it
+      return true;
+    }
+
+    // Check if failed job count is beyond threshold and if so, fail the 
workflow
+    // and abort in-progress jobs
+    int failedJobs = 0;
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+        failedJobs++;
+        if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
+          ctx.setWorkflowState(TaskState.FAILED);
+          for (String jobToFail : cfg.getJobDag().getAllNodes()) {
+            if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
+              ctx.setJobState(jobToFail, TaskState.ABORTED);
+              // Skip aborted jobs latency since they are not accurate latency 
for job running time
+              if (_clusterStatusMonitor != null) {
+                _clusterStatusMonitor
+                    .updateJobCounters(jobConfigMap.get(jobToFail), 
TaskState.ABORTED);
+              }
+            }
+          }
+          return true;
+        }
+      }
+      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
+          && jobState != TaskState.TIMED_OUT) {
+        incomplete = true;
+      }
+    }
+
+    if (!incomplete && cfg.isTerminable()) {
+      ctx.setWorkflowState(TaskState.COMPLETED);
+      return true;
+    }
+
+    return false;
+  }
+
+  protected void updateWorkflowMonitor(WorkflowContext context,
+      WorkflowConfig config) {
+    if (_clusterStatusMonitor != null) {
+      _clusterStatusMonitor.updateWorkflowCounters(config, 
context.getWorkflowState(),
+          context.getFinishTime() - context.getStartTime());
+    }
+  }
+
+
+  // Common methods
+
+  protected Set<String> getExcludedInstances(String currentJobName, 
WorkflowConfig workflowCfg,
+      ClusterDataCache cache) {
+    Set<String> ret = new HashSet<String>();
+
+    if (!workflowCfg.isAllowOverlapJobAssignment()) {
+      // exclude all instances that has been assigned other jobs' tasks
+      for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+        if (jobName.equals(currentJobName)) {
+          continue;
+        }
+        JobContext jobContext = cache.getJobContext(jobName);
+        if (jobContext == null) {
+          continue;
+        }
+        for (int pId : jobContext.getPartitionSet()) {
+          TaskPartitionState partitionState = 
jobContext.getPartitionState(pId);
+          if (partitionState == TaskPartitionState.INIT
+              || partitionState == TaskPartitionState.RUNNING) {
+            ret.add(jobContext.getAssignedParticipant(pId));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Schedule the rebalancer timer for task framework elements
+   * @param resourceId       The resource id
+   * @param startTime        The resource start time
+   * @param timeoutPeriod    The resource timeout period. Will be -1 if it is 
not set.
+   */
+  protected void scheduleRebalanceForTimeout(String resourceId, long startTime,
+      long timeoutPeriod) {
+    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
+    long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
+    if (nextTimeout >= System.currentTimeMillis() && (
+        nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
+            || nextTimeout < nextRebalanceTime)) {
+      _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
+    }
+  }
+
+  /**
+   * Basic function to check task framework resources, workflow and job, are 
timeout
+   * @param startTime       Resources start time
+   * @param timeoutPeriod   Resources timeout period. Will be -1 if it is not 
set.
+   * @return
+   */
+  protected boolean isTimeout(long startTime, long timeoutPeriod) {
+    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
+    return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout 
<= System
+        .currentTimeMillis();
+  }
+
+  private long getTimeoutTime(long startTime, long timeoutPeriod) {
+    return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
+        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
+        ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod;
+  }
+
+
+  /**
+   * Set the ClusterStatusMonitor for metrics update
+   */
+  public void setClusterStatusMonitor(ClusterStatusMonitor 
clusterStatusMonitor) {
+    _clusterStatusMonitor = clusterStatusMonitor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index fd80229..5f05acc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -36,6 +36,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
@@ -64,8 +65,9 @@ public class JobRebalancer extends TaskRebalancer {
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache 
clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput) {
+  public ResourceAssignment computeBestPossiblePartitionState(
+      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+      CurrentStateOutput currStateOutput) {
     final String jobName = resource.getResourceName();
     LOG.debug("Computer Best Partition for job: " + jobName);
 
@@ -101,8 +103,8 @@ public class JobRebalancer extends TaskRebalancer {
     TaskState workflowState = workflowCtx.getWorkflowState();
     TaskState jobState = workflowCtx.getJobState(jobName);
     // The job is already in a final state (completed/failed).
-    if (workflowState == TaskState.FAILED || workflowState == 
TaskState.COMPLETED ||
-        jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+    if (workflowState == TaskState.FAILED || workflowState == 
TaskState.COMPLETED
+        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
       LOG.info(String.format(
           "Workflow %s or job %s is already failed or completed, workflow 
state (%s), job state (%s), clean up job IS.",
           workflowResource, jobName, workflowState, jobState));
@@ -116,8 +118,8 @@ public class JobRebalancer extends TaskRebalancer {
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
-    if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, 
workflowCfg,
-        workflowCtx, getInCompleteJobCount(workflowCfg, workflowCtx),
+    if (!TaskUtil.isJobStarted(jobName, workflowCtx) && 
!isJobReadyToSchedule(jobName, workflowCfg,
+        workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
         clusterData.getJobConfigMap())) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -154,10 +156,40 @@ public class JobRebalancer extends TaskRebalancer {
       LOG.error("No available instance found for job!");
     }
 
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+    TargetState jobTgtState = workflowCfg.getTargetState();
+    jobState = workflowCtx.getJobState(jobName);
+    workflowState = workflowCtx.getWorkflowState();
+
+    if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), 
jobCfg.getTimeout())
+        || TaskState.TIMED_OUT.equals(workflowState))) {
+      jobState = TaskState.TIMING_OUT;
+      workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
+    } else if (jobState != TaskState.TIMING_OUT && jobState != 
TaskState.FAILING) {
+      // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks 
are being aborted
+      // Update running status in workflow context
+      if (jobTgtState == TargetState.STOP) {
+        if (TaskUtil.checkJobStopped(jobCtx)) {
+          workflowCtx.setJobState(jobName, TaskState.STOPPED);
+        } else {
+          workflowCtx.setJobState(jobName, TaskState.STOPPING);
+        }
+        // Workflow has been stopped if all in progress jobs are stopped
+        if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+          workflowCtx.setWorkflowState(TaskState.STOPPED);
+        } else {
+          workflowCtx.setWorkflowState(TaskState.STOPPING);
+        }
+      } else {
+        workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
+        // Workflow is in progress if any task is in progress
+        workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+      }
+    }
+
+    Set<Integer> partitionsToDrop = new TreeSet<>();
     ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, 
liveInstances,
-            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, 
clusterData);
+        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, 
jobTgtState, prevAssignment,
+            liveInstances, currStateOutput, workflowCtx, jobCtx, 
partitionsToDrop, clusterData);
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
@@ -181,74 +213,21 @@ public class JobRebalancer extends TaskRebalancer {
     return newAssignment;
   }
 
-  private Set<String> getExcludedInstances(String currentJobName,
-      WorkflowConfig workflowCfg, ClusterDataCache cache) {
-    Set<String> ret = new HashSet<String>();
-
-    if (!workflowCfg.isAllowOverlapJobAssignment()) {
-      // exclude all instances that has been assigned other jobs' tasks
-      for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-        if (jobName.equals(currentJobName)) {
-          continue;
-        }
-        JobContext jobContext = cache.getJobContext(jobName);
-        if (jobContext == null) {
-          continue;
-        }
-        for (int pId : jobContext.getPartitionSet()) {
-          TaskPartitionState partitionState = 
jobContext.getPartitionState(pId);
-          if (partitionState == TaskPartitionState.INIT || partitionState == 
TaskPartitionState.RUNNING) {
-            ret.add(jobContext.getAssignedParticipant(pId));
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
   private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment 
prevTaskToInstanceStateAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> 
partitionsToDropFromIs,
-      ClusterDataCache cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-    TaskState jobState = workflowCtx.getJobState(jobResource);
-    TaskState workflowState = workflowCtx.getWorkflowState();
+      WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, 
TargetState jobTgtState,
+      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> 
liveInstances,
+      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, 
JobContext jobCtx,
+      Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) {
 
-    if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), 
jobCfg.getTimeout())
-        || TaskState.TIMED_OUT.equals(workflowState))) {
-      jobState = TaskState.TIMING_OUT;
-      workflowCtx.setJobState(jobResource, TaskState.TIMING_OUT);
-    } else if (jobState != TaskState.TIMING_OUT && jobState != 
TaskState.FAILING) {
-      // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks 
are being aborted
-      // Update running status in workflow context
-      if (jobTgtState == TargetState.STOP) {
-        if (checkJobStopped(jobCtx)) {
-          workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-        } else {
-          workflowCtx.setJobState(jobResource, TaskState.STOPPING);
-        }
-        // Workflow has been stopped if all in progress jobs are stopped
-        if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-          workflowCtx.setWorkflowState(TaskState.STOPPED);
-        } else {
-          workflowCtx.setWorkflowState(TaskState.STOPPING);
-        }
-      } else {
-        workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-        // Workflow is in progress if any task is in progress
-        workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-      }
-    }
 
     // Used to keep track of tasks that have already been assigned to 
instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
+    Set<Integer> assignedPartitions = new HashSet<>();
 
     // Used to keep track of tasks that have failed, but whose failure is 
acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
+    Set<Integer> skippedPartitions = new HashSet<>();
 
     // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, 
PartitionAssignment>();
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
 
     Set<String> excludedInstances = getExcludedInstances(jobResource, 
workflowConfig, cache);
 
@@ -259,7 +238,8 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
-      String failureMsg = "Empty task partition mapping for job " + 
jobResource + ", marked the job as FAILED!";
+      String failureMsg =
+          "Empty task partition mapping for job " + jobResource + ", marked 
the job as FAILED!";
       LOG.info(failureMsg);
       jobCtx.setInfo(failureMsg);
       failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap());
@@ -268,7 +248,8 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment, allPartitions);
+        getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment,
+            allPartitions);
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
@@ -277,152 +258,14 @@ public class JobRebalancer extends TaskRebalancer {
               + " excludedInstances: " + excludedInstances);
     }
 
-    // Iterate through all instances
-    for (String instance : prevInstanceToTaskAssignments.keySet()) {
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-        TaskPartitionState currState =
-            updateJobContextAndGetTaskCurrentState(currStateOutput, 
jobResource, pId, pName, instance, jobCtx);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), 
instance);
-        if (pendingMessage != null && 
!pendingMessage.getToState().equals(currState.name())) {
-          processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, 
pId, pName, instance,
-              pendingMessage, jobState, currState, paMap, assignedPartitions);
-          continue;
-        }
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new 
Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = 
TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for 
instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Instance %s requested a state transition to %s for partition 
%s.", instance,
-                requestedState, pName));
-          }
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING: {
-          TaskPartitionState nextState = TaskPartitionState.RUNNING;
-          if (jobState == TaskState.TIMING_OUT) {
-            nextState = TaskPartitionState.TASK_ABORTED;
-          } else if (jobTgtState == TargetState.STOP) {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
-                    instance));
-          }
-        }
-          break;
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format("Setting task partition %s state to %s on instance 
%s.", pName, nextState,
-                    instance));
-          }
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the 
context object.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
-                pName, currState));
-          }
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case TASK_ABORTED:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
-                pName, currState, jobCtx.getPartitionInfo(pId)));
-          }
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition 
fails for a specified
-          // maximum number of attempts or task is in ABORTED state.
-          // But notice that if job is TIMED_OUT, aborted task won't be 
treated as fail and won't cause job fail.
-          // After all tasks are aborted, they will be dropped, because of job 
timeout.
-          if (jobState != TaskState.TIMED_OUT && jobState != 
TaskState.TIMING_OUT) {
-            if (jobCtx.getPartitionNumAttempts(pId) >= 
jobCfg.getMaxAttemptsPerTask()
-                || currState.equals(TaskPartitionState.TASK_ABORTED)
-                || currState.equals(TaskPartitionState.ERROR)) {
-              skippedPartitions.add(pId);
-              partitionsToDropFromIs.add(pId);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("skippedPartitions:" + skippedPartitions);
-              }
-            } else {
-              // Mark the task to be started at some later time (if enabled)
-              markPartitionDelayed(jobCfg, jobCtx, pId);
-            }
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
-                pName, currState));
-          }
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the 
error states.
-      pSet.removeAll(donePartitions);
-    }
+    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
+        currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, 
jobState,
+        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions);
 
     addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
-    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > 
jobCfg.getFailureThreshold()) {
+    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg
+        .getFailureThreshold()) {
       if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
         failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap());
         return buildEmptyAssignment(jobResource, currStateOutput);
@@ -452,8 +295,7 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
-      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx,
-          cache.getJobConfigMap());
+      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, 
cache.getJobConfigMap());
       _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
           jobCtx.getFinishTime() - jobCtx.getStartTime());
       _rebalanceScheduler.removeScheduledRebalance(jobResource);
@@ -464,17 +306,7 @@ public class JobRebalancer extends TaskRebalancer {
     // If job is being timed out and no task is running (for whatever reason), 
idealState can be deleted and all tasks
     // can be dropped(note that Helix doesn't track whether the drop is 
success or not).
     if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, 
currStateOutput)) {
-      jobCtx.setFinishTime(System.currentTimeMillis());
-      workflowCtx.setJobState(jobResource, TaskState.TIMED_OUT);
-      // Mark all INIT task to TASK_ABORTED
-      for (int pId : jobCtx.getPartitionSet()) {
-        if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT) {
-          jobCtx.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
-        }
-      }
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT);
-      _rebalanceScheduler.removeScheduledRebalance(jobResource);
-      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobResource);
+      handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
       return buildEmptyAssignment(jobResource, currStateOutput);
     }
 
@@ -482,80 +314,19 @@ public class JobRebalancer extends TaskRebalancer {
     scheduleForNextTask(jobResource, jobCtx, currentTime);
 
     // Make additional task assignments if needed.
-    if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT && 
jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from 
consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned 
partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedTasks(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments = 
taskAssignmentCal
-          .getTaskAssignment(currStateOutput, 
prevTaskToInstanceStateAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, 
cache.getIdealStates());
-
-      if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) {
-        dropRebalancedRunningTasks(tgtPartitionAssignments, 
prevInstanceToTaskAssignments, paMap,
-            jobCtx);
-      }
-
-      for (Map.Entry<String, SortedSet<Integer>> entry : 
prevInstanceToTaskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
-            .contains(instance)) {
-          continue;
-        }
-        // 1. throttled by job configuration
-        // Contains the set of task partitions currently assigned to the 
instance.
-        Set<Integer> pSet = entry.getValue();
-        int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - 
pSet.size();
-        // 2. throttled by participant capacity
-        int participantCapacity = 
cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
-        if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) 
{
-          participantCapacity = 
cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
-        }
-        int participantLimitation = participantCapacity - 
cache.getParticipantActiveTaskCount(instance);
-        // New tasks to be assigned
-        int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format(
-              "Throttle tasks to be assigned to instance %s using limitation: 
Job Concurrent Task(%d), "
-                  + "Participant Max Task(%d). Remaining capacity %d.", 
instance, jobCfgLimitation,
-              participantCapacity, numToAssign));
-        }
-        if (numToAssign > 0) {
-          Set<Integer> throttledSet = new HashSet<Integer>();
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), 
excludeSet, throttledSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format("Setting task partition %s state to %s on instance 
%s.", pName,
-                      TaskPartitionState.RUNNING, instance));
-            }
-          }
-          cache.setParticipantActiveTaskCount(instance,
-              cache.getParticipantActiveTaskCount(instance) + 
nextPartitions.size());
-          if (!throttledSet.isEmpty()) {
-            LOG.debug(
-                throttledSet.size() + "tasks are ready but throttled when 
assigned to participant.");
-          }
-        }
-      }
+    if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
+        && jobTgtState == TargetState.START) {
+      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
+          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
+          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, 
skippedPartitions,
+          taskAssignmentCal, allPartitions, currentTime, liveInstances);
     }
 
     return toResourceAssignment(jobResource, paMap);
   }
 
-  private ResourceAssignment toResourceAssignment(String jobResource, 
Map<Integer, PartitionAssignment> paMap) {
+  private ResourceAssignment toResourceAssignment(String jobResource,
+      Map<Integer, PartitionAssignment> paMap) {
     // Construct a ResourceAssignment object from the map of partition 
assignments.
     ResourceAssignment ra = new ResourceAssignment(jobResource);
     for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
@@ -566,95 +337,6 @@ public class JobRebalancer extends TaskRebalancer {
     return ra;
   }
 
-  /**
-   * If assignment is different from previous assignment, drop the old running 
task if it's no
-   * longer assigned to the same instance, but not removing it from excludeSet 
because the same task
-   * should not be assigned to the new instance right way.
-   */
-  private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> 
newAssignment,
-      Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, 
PartitionAssignment> paMap,
-      JobContext jobContext) {
-    for (String instance : oldAssignment.keySet()) {
-      for (Integer pId : oldAssignment.get(instance)) {
-        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
-                && !newAssignment.get(instance).contains(pId)) {
-            paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
-            jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
-        }
-      }
-    }
-  }
-
-  private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(CurrentStateOutput currentStateOutput,
-      String jobResource, Integer pId, String pName, String instance, 
JobContext jobCtx) {
-    String currentStateString = 
currentStateOutput.getCurrentState(jobResource, new Partition(
-        pName), instance);
-    if (currentStateString == null) {
-      // Task state is either DROPPED or INIT
-      return jobCtx.getPartitionState(pId);
-    }
-    TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(
-        pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
-    }
-    return currentState;
-  }
-
-  private void processTaskWithPendingMessage(ResourceAssignment 
prevAssignment, Integer pId, String pName,
-      String instance, Message pendingMessage, TaskState jobState, 
TaskPartitionState currState,
-      Map<Integer, PartitionAssignment> paMap, Set<Integer> 
assignedPartitions) {
-
-    Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
-    if (stateMap != null) {
-      String prevState = stateMap.get(instance);
-      if (!pendingMessage.getToState().equals(prevState)) {
-        LOG.warn(String.format("Task pending to-state is %s while previous 
assigned state is %s. This should not"
-            + "heppen.", pendingMessage.getToState(), prevState));
-      }
-      if (jobState == TaskState.TIMING_OUT
-          && currState == TaskPartitionState.INIT
-          && prevState.equals(TaskPartitionState.RUNNING.name())) {
-        // While job is timing out, if the task is pending on INIT->RUNNING, 
set it back to INIT,
-        // so that Helix will cancel the transition.
-        paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.INIT.name()));
-        assignedPartitions.add(pId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format(
-              "Task partition %s has a pending state transition on instance %s 
INIT->RUNNING. "
-                  + "Setting it back to INIT so that Helix can cancel the 
transition(if enabled).",
-              pName, instance, prevState));
-        }
-      } else {
-        // Otherwise, Just copy forward
-        // the state assignment from the previous ideal state.
-        paMap.put(pId, new PartitionAssignment(instance, prevState));
-        assignedPartitions.add(pId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format(
-              "Task partition %s has a pending state transition on instance 
%s. Using the previous ideal state which was %s.",
-              pName, instance, prevState));
-        }
-      }
-    }
-  }
-
-  private void failJob(String jobName, WorkflowContext workflowContext, 
JobContext jobContext,
-      WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap) {
-    markJobFailed(jobName, jobContext, workflowConfig, workflowContext, 
jobConfigMap);
-    // Mark all INIT task to TASK_ABORTED
-    for (int pId : jobContext.getPartitionSet()) {
-      if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
-        jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
-      }
-    }
-    _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), 
TaskState.FAILED);
-    _rebalanceScheduler.removeScheduledRebalance(jobName);
-    TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobName);
-  }
-
   private boolean isJobFinished(JobContext jobContext, String jobResource,
       CurrentStateOutput currentStateOutput) {
     for (int pId : jobContext.getPartitionSet()) {
@@ -663,56 +345,19 @@ public class JobRebalancer extends TaskRebalancer {
       String instance = jobContext.getAssignedParticipant(pId);
       Message pendingMessage = currentStateOutput.getPendingState(jobResource, 
partition, instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to 
say the job finished
-      if (state == TaskPartitionState.RUNNING
-          || (state == TaskPartitionState.INIT && pendingMessage != null)) {
+      if (state == TaskPartitionState.RUNNING || (state == 
TaskPartitionState.INIT
+          && pendingMessage != null)) {
         return false;
       }
     }
     return true;
   }
 
-  private void markJobComplete(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) {
-    long currentTime = System.currentTimeMillis();
-    workflowContext.setJobState(jobName, TaskState.COMPLETED);
-    jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
-      workflowContext.setFinishTime(currentTime);
-      updateWorkflowMonitor(workflowContext, workflowConfig);
-    }
-    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
-  }
-
-  private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
-    // Figure out the earliest schedulable time in the future of a 
non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : jobCtx.getPartitionSet()) {
-      long retryTime = jobCtx.getNextRetryTime(p);
-      TaskPartitionState state = jobCtx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, 
TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > now && retryTime < 
earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      long scheduledTime = _rebalanceScheduler.getRebalanceTime(job);
-      if (scheduledTime == -1 || earliestTime < scheduledTime) {
-        _rebalanceScheduler.scheduleRebalance(_manager, job, earliestTime);
-      }
-    }
-  }
-
   /**
    * Get the last task assignment for a given job
    *
    * @param resourceName the name of the job
+   *
    * @return {@link ResourceAssignment} instance, or null if no assignment is 
available
    */
   private ResourceAssignment getPrevResourceAssignment(String resourceName) {
@@ -728,18 +373,16 @@ public class JobRebalancer extends TaskRebalancer {
    * @param resourceName the name of the job
    * @param ra           {@link ResourceAssignment} containing the task 
assignment
    */
-  private void setPrevResourceAssignment(String resourceName,
-      ResourceAssignment ra) {
+  private void setPrevResourceAssignment(String resourceName, 
ResourceAssignment ra) {
     _manager.getHelixPropertyStore()
         .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
             ra.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
-   * Checks if the job has completed.
-   * Look at states of all tasks of the job, there're 3 kind: completed, given 
up, not given up.
-   * The job is completed if all tasks are completed or given up, and the 
number of given up tasks is within job
-   * failure threshold.
+   * Checks if the job has completed. Look at states of all tasks of the job, 
there're 3 kind:
+   * completed, given up, not given up. The job is completed if all tasks are 
completed or given up,
+   * and the number of given up tasks is within job failure threshold.
    */
   private static boolean isJobComplete(JobContext ctx, Set<Integer> 
allPartitions, JobConfig cfg) {
     int numOfGivenUpTasks = 0;
@@ -747,7 +390,7 @@ public class JobRebalancer extends TaskRebalancer {
     for (Integer pId : allPartitions) {
       TaskPartitionState state = ctx.getPartitionState(pId);
       if (state != TaskPartitionState.COMPLETED) {
-        if(!isTaskGivenup(ctx, cfg, pId)) {
+        if (!isTaskGivenup(ctx, cfg, pId)) {
           return false;
         }
         // If the task is given up, there's still chance the job has completed 
because of job failure threshold.
@@ -757,98 +400,23 @@ public class JobRebalancer extends TaskRebalancer {
     return numOfGivenUpTasks <= cfg.getFailureThreshold();
   }
 
-  private static void addCompletedTasks(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) 
{
-    TaskPartitionState state = ctx.getPartitionState(pId);
-    if (state == TaskPartitionState.TASK_ABORTED || state == 
TaskPartitionState.ERROR) {
-      return true;
-    }
-    if (state == TaskPartitionState.TIMED_OUT || state == 
TaskPartitionState.TASK_ERROR) {
-      return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-    }
-    return false;
-  }
-
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, 
Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> 
candidatePartitions,
-      Set<Integer> excluded, Set<Integer> throttled, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (!excluded.contains(pId)) {
-        if (result.size() < n) {
-          result.add(pId);
-        } else {
-          throttled.add(pId);
-        }
-      }
-    }
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int 
p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
-  }
-
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(JobContext ctx, int pId, 
TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
-  }
-
-  private static void markAllPartitionsError(JobContext ctx, 
TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
-
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
+   * @param prevAssignment    task partition -> (instance -> state)
    * @param allTaskPartitions all task partitionIds
+   *
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
   private static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment, 
Set<Integer> allTaskPartitions) {
+      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
+      Set<Integer> allTaskPartitions) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, 
SortedSet<Integer>>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<Integer>());
     }
 
     for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = getPartitionId(partition.getPartitionName());
+      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
       if (allTaskPartitions.contains(pId)) {
         Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
         for (String instance : replicaMap.keySet()) {
@@ -862,67 +430,7 @@ public class JobRebalancer extends TaskRebalancer {
     return result;
   }
 
-  /* Extracts the partition id from the given partition name. */
-  private static int getPartitionId(String pName) {
-    int index = pName.lastIndexOf("_");
-    if (index == -1) {
-      throw new HelixException("Invalid partition name " + pName);
-    }
-    return Integer.valueOf(pName.substring(index + 1));
-  }
-
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
-      }
-    }
-    return nonReadyPartitions;
-  }
-
   private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) 
{
-    return isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : 
_fixTaskAssignmentCal;
-  }
-
-  private boolean isGenericTaskJob(JobConfig jobConfig) {
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    return taskConfigMap != null && !taskConfigMap.isEmpty();
-  }
-
-  /**
-   * Check whether tasks are not in final states
-   * @param jobContext The job context
-   * @return           False if still tasks not in final state. Otherwise 
return true
-   */
-  private boolean checkJobStopped(JobContext jobContext) {
-    for (int partition : jobContext.getPartitionSet()) {
-      TaskPartitionState taskState = jobContext.getPartitionState(partition);
-      if (taskState == TaskPartitionState.RUNNING) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  private String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
+    return TaskUtil.isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : 
_fixTaskAssignmentCal;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d8b8ea9..1d29368 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -43,79 +43,19 @@ import com.google.common.collect.Maps;
 /**
  * Abstract rebalancer class for the {@code Task} state model.
  */
-public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
+public abstract class TaskRebalancer extends AbstractTaskDispatcher
+    implements Rebalancer, MappingCalculator {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskRebalancer.class);
 
-  // For connection management
-  protected HelixManager _manager;
-  protected static RebalanceScheduler _rebalanceScheduler = new 
RebalanceScheduler();
-  protected ClusterStatusMonitor _clusterStatusMonitor;
-
-  @Override public void init(HelixManager manager) {
+  @Override
+  public void init(HelixManager manager) {
     _manager = manager;
   }
 
-  @Override public abstract ResourceAssignment 
computeBestPossiblePartitionState(
-      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
-      CurrentStateOutput currStateOutput);
-
-  /**
-   * Checks if the workflow has finished (either completed or failed).
-   * Set the state in workflow context properly.
-   *
-   * @param ctx Workflow context containing job states
-   * @param cfg Workflow config containing set of jobs
-   * @return returns true if the workflow
-   *            1. completed (all tasks are {@link TaskState#COMPLETED})
-   *            2. failed (any task is {@link TaskState#FAILED}
-   *            3. workflow is {@link TaskState#TIMED_OUT}
-   *         returns false otherwise.
-   */
-  protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
-      Map<String, JobConfig> jobConfigMap) {
-    boolean incomplete = false;
-
-    TaskState workflowState = ctx.getWorkflowState();
-    if (TaskState.TIMED_OUT.equals(workflowState)) {
-      // We don't update job state here as JobRebalancer will do it
-      return true;
-    }
-
-    // Check if failed job count is beyond threshold and if so, fail the 
workflow
-    // and abort in-progress jobs
-    int failedJobs = 0;
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      TaskState jobState = ctx.getJobState(job);
-      if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
-        failedJobs++;
-        if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
-          ctx.setWorkflowState(TaskState.FAILED);
-          for (String jobToFail : cfg.getJobDag().getAllNodes()) {
-            if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
-              ctx.setJobState(jobToFail, TaskState.ABORTED);
-              // Skip aborted jobs latency since they are not accurate latency 
for job running time
-              if (_clusterStatusMonitor != null) {
-                _clusterStatusMonitor
-                    .updateJobCounters(jobConfigMap.get(jobToFail), 
TaskState.ABORTED);
-              }
-            }
-          }
-          return true;
-        }
-      }
-      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
-          && jobState != TaskState.TIMED_OUT) {
-        incomplete = true;
-      }
-    }
-
-    if (!incomplete && cfg.isTerminable()) {
-      ctx.setWorkflowState(TaskState.COMPLETED);
-      return true;
-    }
 
-    return false;
-  }
+  @Override
+  public abstract ResourceAssignment 
computeBestPossiblePartitionState(ClusterDataCache clusterData,
+      IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput);
 
   /**
    * Checks if the workflow has been stopped.
@@ -224,64 +164,6 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
     return true;
   }
 
-  protected boolean isJobStarted(String job, WorkflowContext workflowContext) {
-    TaskState jobState = workflowContext.getJobState(job);
-    return (jobState != null && jobState != TaskState.NOT_STARTED);
-  }
-
-  /**
-   * Count the number of jobs in a workflow that are in progress.
-   *
-   * @param workflowCfg
-   * @param workflowCtx
-   * @return
-   */
-  protected int getInCompleteJobCount(WorkflowConfig workflowCfg, 
WorkflowContext workflowCtx) {
-    int inCompleteCount = 0;
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      TaskState jobState = workflowCtx.getJobState(jobName);
-      if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
-        ++inCompleteCount;
-      }
-    }
-
-    return inCompleteCount;
-  }
-
-  protected void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) {
-    long currentTime = System.currentTimeMillis();
-    workflowContext.setJobState(jobName, TaskState.FAILED);
-    if (jobContext != null) {
-      jobContext.setFinishTime(currentTime);
-    }
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
-      workflowContext.setFinishTime(currentTime);
-      updateWorkflowMonitor(workflowContext, workflowConfig);
-    }
-    scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
-  }
-
-  protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig 
workflowConfig,
-      long currentTime) {
-    long currentScheduledTime =
-        _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) 
== -1
-            ? Long.MAX_VALUE
-            : 
_rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
-    if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
-      _rebalanceScheduler.scheduleRebalance(_manager, 
workflowConfig.getWorkflowId(),
-          currentTime + jobConfig.getExpiry());
-    }
-  }
-
-  protected void updateWorkflowMonitor(WorkflowContext context,
-      WorkflowConfig config) {
-    if (_clusterStatusMonitor != null) {
-      _clusterStatusMonitor.updateWorkflowCounters(config, 
context.getWorkflowState(),
-          context.getFinishTime() - context.getStartTime());
-    }
-  }
-
   /**
    * Check if a workflow is ready to schedule.
    *
@@ -294,41 +176,6 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
     return (startTime == null || startTime.getTime() <= 
System.currentTimeMillis());
   }
 
-  /**
-   * Basic function to check task framework resources, workflow and job, are 
timeout
-   * @param startTime       Resources start time
-   * @param timeoutPeriod   Resources timeout period. Will be -1 if it is not 
set.
-   * @return
-   */
-  protected boolean isTimeout(long startTime, long timeoutPeriod) {
-    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
-    return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout 
<= System
-        .currentTimeMillis();
-  }
-
-  /**
-   * Schedule the rebalancer timer for task framework elements
-   * @param resourceId       The resource id
-   * @param startTime        The resource start time
-   * @param timeoutPeriod    The resource timeout period. Will be -1 if it is 
not set.
-   */
-  protected void scheduleRebalanceForTimeout(String resourceId, long startTime,
-      long timeoutPeriod) {
-    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
-    long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
-    if (nextTimeout >= System.currentTimeMillis() && (
-        nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
-            || nextTimeout < nextRebalanceTime)) {
-      _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
-    }
-  }
-
-  private long getTimeoutTime(long startTime, long timeoutPeriod) {
-    return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
-        ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod;
-  }
-
   @Override
   public IdealState computeNewIdealState(String resourceName,
       IdealState currentIdealState, CurrentStateOutput currentStateOutput,
@@ -338,12 +185,4 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
     return currentIdealState;
   }
 
-
-
-  /**
-   * Set the ClusterStatusMonitor for metrics update
-   */
-  public void setClusterStatusMonitor(ClusterStatusMonitor 
clusterStatusMonitor) {
-     _clusterStatusMonitor = clusterStatusMonitor;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index c6d7a55..b3a7f29 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -488,10 +489,10 @@ public class TaskUtil {
    * @param pName
    * @return
    */
-  protected static int getPartitionId(String pName) {
+  public static int getPartitionId(String pName) {
     int index = pName.lastIndexOf("_");
     if (index == -1) {
-      throw new HelixException("Invalid partition name " + pName);
+      throw new HelixException(String.format("Invalid partition name %s", 
pName));
     }
     return Integer.valueOf(pName.substring(index + 1));
   }
@@ -817,4 +818,62 @@ public class TaskUtil {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));
   }
+
+  public static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+    Set<Integer> nonReadyPartitions = Sets.newHashSet();
+    for (int p : ctx.getPartitionSet()) {
+      long toStart = ctx.getNextRetryTime(p);
+      if (now < toStart) {
+        nonReadyPartitions.add(p);
+      }
+    }
+    return nonReadyPartitions;
+  }
+
+  public static boolean isGenericTaskJob(JobConfig jobConfig) {
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    return taskConfigMap != null && !taskConfigMap.isEmpty();
+  }
+
+  /**
+   * Check whether tasks are just started or still running
+   *
+   * @param jobContext The job context
+   *
+   * @return False if still tasks not in final state. Otherwise return true
+   */
+  public static boolean checkJobStopped(JobContext jobContext) {
+    for (int partition : jobContext.getPartitionSet()) {
+      TaskPartitionState taskState = jobContext.getPartitionState(partition);
+      if (taskState == TaskPartitionState.RUNNING) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+
+  /**
+   * Count the number of jobs in a workflow that are not in final state.
+   *
+   * @param workflowCfg
+   * @param workflowCtx
+   * @return
+   */
+  public static int getInCompleteJobCount(WorkflowConfig workflowCfg, 
WorkflowContext workflowCtx) {
+    int inCompleteCount = 0;
+    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = workflowCtx.getJobState(jobName);
+      if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+        ++inCompleteCount;
+      }
+    }
+
+    return inCompleteCount;
+  }
+
+  public static boolean isJobStarted(String job, WorkflowContext 
workflowContext) {
+    TaskState jobState = workflowContext.getJobState(job);
+    return (jobState != null && jobState != TaskState.NOT_STARTED);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 9233f9b..32eed2c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -196,7 +196,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       return;
     }
 
-    int inCompleteAllJobCount = getInCompleteJobCount(workflowCfg, 
workflowCtx);
+    int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, 
workflowCtx);
     int scheduledJobs = 0;
     long timeToSchedule = Long.MAX_VALUE;
     for (String job : workflowCfg.getJobDag().getAllNodes()) {

Reply via email to