This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cb8c696e3e7e1b6a984a4eda0e789f5518591f06
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Wed Jun 10 11:26:09 2020 -0700

    Remove previousAssignment from FixedTargetTaskAssignmentCalculator (#1061)
    
    Remove previousAssignment from FixedTargetTaskAssignmentCalculator
    
    The FixedTargetTaskAssignmentCalculator class is relying on the 
previousAssignment.
    In this commit, this class has been modified and previousAssignment has been
    replaced with the context information.
    Also several legacy job and workflow rebalancers have been removed.
---
 .../java/org/apache/helix/model/IdealState.java    |   18 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  |   15 +-
 .../helix/task/DeprecatedTaskRebalancer.java       | 1149 --------------------
 .../task/FixedTargetTaskAssignmentCalculator.java  |   47 +-
 .../helix/task/FixedTargetTaskRebalancer.java      |   60 -
 .../task/GenericTaskAssignmentCalculator.java      |   12 +-
 .../apache/helix/task/GenericTaskRebalancer.java   |   57 -
 .../java/org/apache/helix/task/JobDispatcher.java  |   18 +-
 .../helix/task/TaskAssignmentCalculator.java       |   18 +
 .../ThreadCountBasedTaskAssignmentCalculator.java  |   10 +
 .../TestFixedTargetedTaskAssignmentCalculator.java |  288 +++++
 11 files changed, 386 insertions(+), 1306 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java 
b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 3274542..aafcca8 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -20,7 +20,9 @@ package org.apache.helix.model;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,8 +33,6 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.model.ResourceConfig.ResourceConfigProperty;
-import org.apache.helix.task.FixedTargetTaskRebalancer;
-import org.apache.helix.task.GenericTaskRebalancer;
 import org.apache.helix.task.JobRebalancer;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.task.WorkflowRebalancer;
@@ -74,6 +74,9 @@ public class IdealState extends HelixProperty {
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
+  public static final Set<String> LEGACY_TASK_REBALANCERS =
+      new 
HashSet<>(Arrays.asList("org.apache.helix.task.GenericTaskRebalancer",
+          "org.apache.helix.task.FixedTargetTaskRebalancer"));
 
   /**
    * Deprecated, use ResourceConfig.ResourceConfigConstants instead
@@ -721,11 +724,16 @@ public class IdealState extends HelixProperty {
       String rebalancerName = getRebalancerClassName();
       if (rebalancerName != null) {
         if (rebalancerName.equals(JobRebalancer.class.getName())
-            || rebalancerName.equals(WorkflowRebalancer.class.getName())
-            || rebalancerName.equals(GenericTaskRebalancer.class.getName())
-            || 
rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())) {
+            || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
           property = RebalanceMode.TASK;
         } else {
+          if (LEGACY_TASK_REBALANCERS.contains(rebalancerName)) {
+            // Print a warning message if legacy TASK rebalancer is used
+            // Since legacy rebalancers have been removed, it is not safe just 
running legacy jobs and jobs/workflows
+            //with current task assignment strategy.
+            logger.warn("The rebalancer {} is not supported anymore. Setting 
rebalance mode to USER_DEFINED.",
+                rebalancerName);
+          }
           property = RebalanceMode.USER_DEFINED;
         }
       } else {
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index fa12203..ffbdcef 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -539,11 +539,10 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied 
throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, 
Set<String> excludedInstances,
-      String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx,
-      final JobConfig jobCfg, final WorkflowConfig workflowConfig, 
WorkflowContext workflowCtx,
-      final WorkflowControllerDataProvider cache,
-      ResourceAssignment prevTaskToInstanceStateAssignment,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Set<String> excludedInstances, String jobResource, CurrentStateOutput 
currStateOutput,
+      JobContext jobCtx, final JobConfig jobCfg, final WorkflowConfig 
workflowConfig,
+      WorkflowContext workflowCtx, final WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> assignedPartitions, Map<Integer, 
PartitionAssignment> paMap,
       Set<Integer> skippedPartitions, TaskAssignmentCalculator 
taskAssignmentCal,
       Set<Integer> allPartitions, final long currentTime, Collection<String> 
liveInstances) {
@@ -599,9 +598,9 @@ public abstract class AbstractTaskDispatcher {
 
     // The actual assignment is computed here
     // Get instance->[partition, ...] mappings for the target resource.
-    Map<String, SortedSet<Integer>> tgtPartitionAssignments = 
taskAssignmentCal.getTaskAssignment(
-        currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, 
jobCfg, jobCtx,
-        workflowConfig, workflowCtx, filteredTaskPartitionNumbers, 
cache.getIdealStates());
+    Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+        taskAssignmentCal.getTaskAssignment(currStateOutput, liveInstances, 
jobCfg, jobCtx,
+            workflowConfig, workflowCtx, filteredTaskPartitionNumbers, 
cache.getIdealStates());
 
     if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) 
{
       // TODO: Revisit the logic for isRebalanceRunningTask() and valid use 
cases for it
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
deleted file mode 100644
index 5202d41..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ /dev/null
@@ -1,1149 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Custom rebalancer implementation for the {@code Task} state model.
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public abstract class DeprecatedTaskRebalancer
-    implements Rebalancer<WorkflowControllerDataProvider>,
-    MappingCalculator<WorkflowControllerDataProvider> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TaskRebalancer.class);
-
-  // Management of already-scheduled rebalances across jobs
-  private static final BiMap<String, Date> SCHEDULED_TIMES = 
HashBiMap.create();
-  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
-      .newSingleThreadScheduledExecutor();
-  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
-  // For connection management
-  private HelixManager _manager;
-
-  /**
-   * Get all the partitions that should be created by this task
-   * @param jobCfg the task configuration
-   * @param jobCtx the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
-   * @return set of partition numbers
-   */
-  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, 
JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
WorkflowControllerDataProvider cache);
-
-  /**
-   * Compute an assignment of tasks to instances
-   * @param currStateOutput the current state of the instances
-   * @param prevAssignment the previous task partition assignment
-   * @param instances the instances
-   * @param jobCfg the task configuration
-   * @param jobContext the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
-   * @return map of instances to set of partition numbers
-   */
-  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
-      WorkflowControllerDataProvider cache);
-
-  @Override
-  public void init(HelixManager manager) {
-    _manager = manager;
-  }
-
-  @Override
-  public ResourceAssignment 
computeBestPossiblePartitionState(WorkflowControllerDataProvider clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput) {
-    final String resourceName = resource.getResourceName();
-    LOG.debug("Computer Best Partition for resource: " + resourceName);
-
-    // Fetch job configuration
-    JobConfig jobCfg = (JobConfig) clusterData.getResourceConfig(resourceName);
-    if (jobCfg == null) {
-      LOG.debug("Job configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = 
clusterData.getWorkflowConfig(workflowResource);
-    if (workflowCfg == null) {
-      LOG.debug("Workflow configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    WorkflowContext workflowCtx = 
clusterData.getWorkflowContext(workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new 
ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflowResource);
-      LOG.info("Workflow context for " + resourceName + " created!");
-    }
-
-    // check ancestor job status
-    int notStartedCount = 0;
-    int inCompleteCount = 0;
-    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) 
{
-      TaskState jobState = workflowCtx.getJobState(ancestor);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.IN_PROGRESS || jobState == 
TaskState.STOPPED) {
-        ++inCompleteCount;
-      }
-    }
-
-    if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >= 
workflowCfg
-        .getParallelJobs())) {
-      LOG.debug("Job is not ready to be scheduled due to pending dependent 
jobs " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info(
-          "Workflow is marked as deleted " + workflowResource
-              + " cleaning up the workflow context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= 
System.currentTimeMillis()) {
-      LOG.info("Workflow " + workflowResource
-          + " is completed and passed expiry time, cleaning up the workflow 
context.");
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-
-    JobContext jobCtx = clusterData.getJobContext(resourceName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
-      jobCtx.setStartTime(System.currentTimeMillis());
-      jobCtx.setName(resourceName);
-    }
-
-    // Check for expired jobs for non-terminable workflows
-    long jobFinishTime = jobCtx.getFinishTime();
-    if (!workflowCfg.isTerminable() && jobFinishTime != 
WorkflowContext.UNFINISHED
-        && jobFinishTime + workflowCfg.getExpiry() <= 
System.currentTimeMillis()) {
-      LOG.info("Job " + resourceName
-          + " is completed and passed expiry time, cleaning up the job 
context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // The job is already in a final state (completed/failed).
-    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
-        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
-      LOG.debug("Job " + resourceName + " is failed or already completed.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, 
resourceName, clusterData);
-    if (!isReady) {
-      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(_manager, 
resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped 
from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is 
required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, jobCfg, 
prevAssignment, clusterData
-            .getLiveInstances().keySet(), currStateOutput, workflowCtx, 
jobCtx, partitionsToDrop,
-            clusterData);
-
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = 
accessor.keyBuilder().idealStates(resourceName);
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(resourceName, jobCtx);
-    clusterData
-        .updateWorkflowContext(workflowResource, workflowCtx);
-
-    setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    LOG.debug("Job " + resourceName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
-
-    return newAssignment;
-  }
-
-  /**
-   * Get the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is 
available
-   */
-  private ResourceAssignment getPrevResourceAssignment(HelixManager manager,
-      String resourceName) {
-    ZNRecord r =
-        manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  public void setPrevResourceAssignment(HelixManager manager, String 
resourceName,
-      ResourceAssignment ra) {
-    manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
-      WorkflowConfig workflowCfg, WorkflowControllerDataProvider cache) {
-
-    Set<String> ret = new HashSet<String>();
-
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      if (jobName.equals(currentJobName)) {
-        continue;
-      }
-
-      JobContext jobContext = cache.getJobContext(jobName);
-      if (jobContext == null) {
-        continue;
-      }
-      for (int partition : jobContext.getPartitionSet()) {
-        TaskPartitionState partitionState = 
jobContext.getPartitionState(partition);
-        if (partitionState == TaskPartitionState.INIT ||
-            partitionState == TaskPartitionState.RUNNING) {
-          ret.add(jobContext.getAssignedParticipant(partition));
-        }
-      }
-    }
-
-    return ret;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment 
prevAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> 
partitionsToDropFromIs,
-      WorkflowControllerDataProvider cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-      // Workflow has been stopped if all jobs are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of tasks that have already been assigned to 
instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Used to keep track of tasks that have failed, but whose failure is 
acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, 
PartitionAssignment>();
-
-    Set<String> excludedInstances =
-        getInstancesAssignedToOtherJobs(jobResource, workflowConfig, cache);
-
-    // Process all the current assignments of tasks.
-    Set<Integer> allPartitions =
-        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, 
cache);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, 
allPartitions);
-    long currentTime = System.currentTimeMillis();
-    for (String instance : taskAssignments.keySet()) {
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingMessage(jobResource, new 
Partition(pName), instance);
-        if (pendingMessage != null) {
-          // There is a pending state transition for this (partition, 
instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
-          if (stateMap != null) {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format(
-                      "Task partition %s has a pending state transition on 
instance %s. Using the previous ideal state which was %s.",
-                      pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            
TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new 
Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new 
Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = 
TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for 
instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition 
%s.", instance,
-              requestedState, pName));
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
-              nextState, instance));
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the 
context object.
-          donePartitions.add(pId);
-          LOG.debug(String
-              .format(
-                  "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
-                  pName, currState));
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
-          LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in 
rebalancer context.",
-              pName, currState));
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition 
fails for a specified
-          // maximum number of attempts.
-          if (jobCtx.getPartitionNumAttempts(pId) >= 
jobCfg.getMaxAttemptsPerTask()) {
-            // If the user does not require this task to succeed in order for 
the job to succeed,
-            // then we don't have to fail the job right now
-            boolean successOptional = false;
-            String taskId = jobCtx.getTaskIdForPartition(pId);
-            if (taskId != null) {
-              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-              if (taskConfig != null) {
-                successOptional = taskConfig.isSuccessOptional();
-              }
-            }
-
-            // Similarly, if we have some leeway for how many tasks we can 
fail, then we don't have
-            // to fail the job immediately
-            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
-              successOptional = true;
-            }
-
-            if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
-              markAllPartitionsError(jobCtx, currState, false);
-              addAllPartitions(allPartitions, partitionsToDropFromIs);
-              return emptyAssignment(jobResource, currStateOutput);
-            } else {
-              skippedPartitions.add(pId);
-              partitionsToDropFromIs.add(pId);
-            }
-          } else {
-            // Mark the task to be started at some later time (if enabled)
-            markPartitionDelayed(jobCfg, jobCtx, pId);
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
-          donePartitions.add(pId);
-          LOG.debug(String.format(
-              "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
-              pName, currState));
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the 
error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming 
ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-    }
-
-    // Make additional task assignments if needed.
-    if (jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from 
consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned 
partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, 
jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
-      for (Map.Entry<String, SortedSet<Integer>> entry : 
taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || 
excludedInstances.contains(instance)) {
-          continue;
-        }
-        // Contains the set of task partitions currently assigned to the 
instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - 
pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), 
excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition 
assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-          ImmutableMap.of(pa._instance, pa._state));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it 
is not
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @param workflowResource the Helix resource associated with the workflow
-   * @param jobResource a job from the workflow
-   * @param cache the current snapshot of the cluster
-   * @return true if ready, false if not ready
-   */
-  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, 
WorkflowContext workflowCtx,
-      String workflowResource, String jobResource, 
WorkflowControllerDataProvider cache) {
-    // Ignore non-scheduled workflows
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Remove any timers that are past-time for this workflow
-      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
-      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-        LOG.debug("Remove schedule timer for " + jobResource + " time: " + 
SCHEDULED_TIMES.get(jobResource));
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          LOG.debug(
-              "Skip scheduling since the workflow has not been started " + 
workflowResource);
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is 
still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = 
cache.getWorkflowContext(lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == 
WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed 
yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the 
next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            
scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
-        // Now clone the workflow if this clone has not yet been created
-        String newWorkflowName = workflowResource + "_" + df.format(new 
java.util.Date(timeToSchedule));
-        LOG.debug("Ready to start workflow " + newWorkflowName);
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              cloneWorkflow(_manager, workflowResource, newWorkflowName, new 
Date(timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          try {
-            // Start the cloned workflow
-            driver.start(clonedWf);
-          } catch (Exception e) {
-            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, 
e);
-          }
-          // Persist workflow start regardless of success to avoid retrying 
and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          cache.updateWorkflowContext(workflowResource, workflowCtx);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        startTime = new Date(timeToSchedule + period);
-        delayFromStart = startTime.getTime() - System.currentTimeMillis();
-      } else {
-        // This is a one-time workflow and is ready
-        return true;
-      }
-    }
-
-    scheduleRebalance(workflowResource, jobResource, startTime, 
delayFromStart);
-    return false;
-  }
-
-  /**
-   * Create a new workflow based on an existing one
-   * @param manager connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired 
start time
-   * @return the cloned workflow, or null if there was a problem cloning the 
existing one
-   */
-  private Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
-
-    // Create a new workflow with a new name
-    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
-    Map<String, String> wfSimpleFields = 
workflowConfig.getRecord().getSimpleFields();
-    JobDag jobDag =
-        
JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
-
-    // Set the workflow expiry
-    builder.setExpiry(
-        
Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
-
-    // Set the schedule, if applicable
-    ScheduleConfig scheduleConfig;
-    if (newStartTime != null) {
-      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-    } else {
-      scheduleConfig = 
WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
-    }
-    if (scheduleConfig != null) {
-      builder.setScheduleConfig(scheduleConfig);
-    }
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = 
jobConfig.getRecord().getSimpleFields();
-        jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), 
newWorkflowName); // overwrite workflow name
-        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
-          builder.addConfig(job, e.getKey(), e.getValue());
-        }
-        Map<String, Map<String, String>> rawTaskConfigMap = 
jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        builder.addTaskConfigs(job, taskConfigs);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, 
namespacedChild);
-            builder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return builder.build();
-  }
-
-  private void scheduleRebalance(String id, String jobResource, Date 
startTime, long delayFromStart) {
-    // Do nothing if there is already a timer set for the this workflow with 
the same start time.
-    if ((SCHEDULED_TIMES.containsKey(id) && 
SCHEDULED_TIMES.get(id).equals(startTime))
-        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
-      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is 
up to date.");
-      return;
-    }
-    LOG.info(
-        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at 
time: " + startTime
-            + " delay from start: " + delayFromStart);
-
-    // For workflows not yet scheduled, schedule them and record it
-    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, 
jobResource);
-    SCHEDULED_TIMES.put(id, startTime);
-    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, 
TimeUnit.MILLISECONDS);
-  }
-
-  private void scheduleForNextTask(String jobResource, JobContext ctx, long 
now) {
-    // Clear current entries if they exist and are expired
-    long currentTime = now;
-    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
-    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-      LOG.debug(
-          "Remove schedule timer for" + jobResource + " time: " + 
SCHEDULED_TIMES.get(jobResource));
-      SCHEDULED_TIMES.remove(jobResource);
-    }
-
-    // Figure out the earliest schedulable time in the future of a 
non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : ctx.getPartitionSet()) {
-      long retryTime = ctx.getNextRetryTime(p);
-      TaskPartitionState state = ctx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, 
TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > currentTime && retryTime 
< earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      long delay = earliestTime - currentTime;
-      Date startTime = new Date(earliestTime);
-      scheduleRebalance(jobResource, jobResource, startTime, delay);
-    }
-  }
-
-  /**
-   * Checks if the job has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @param skippedPartitions partitions that failed, but whose failure is 
acceptable
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> 
allPartitions,
-      Set<Integer> skippedPartitions, JobConfig cfg) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != 
TaskPartitionState.COMPLETED
-          && !isTaskGivenup(ctx, cfg, pId)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has completed.
-   * @param ctx Workflow context containing job states
-   * @param cfg Workflow config containing set of jobs
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false 
otherwise.
-   */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, 
WorkflowConfig cfg) {
-    if (!cfg.isTerminable()) {
-      return false;
-    }
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.COMPLETED) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Checks if the workflow has been stopped.
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false 
otherwise.
-   */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig 
cfg) {
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != 
null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.WorkflowConfigProperty.TargetState.name(), 
TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this job, wiping workflow-level 
information if this
-   * is the last remaining job in its workflow, and the workflow is terminable.
-   */
-  private static void cleanup(HelixManager mgr, final String resourceName, 
WorkflowConfig cfg,
-      String workflowResource) {
-    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + 
workflowResource);
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-
-    // Remove any DAG references in workflow
-    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag
-            
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        for (String child : jobDag.getDirectChildren(resourceName)) {
-          jobDag.getChildrenToParents().get(child).remove(resourceName);
-        }
-        for (String parent : jobDag.getDirectParents(resourceName)) {
-          jobDag.getParentsToChildren().get(parent).remove(resourceName);
-        }
-        jobDag.getChildrenToParents().remove(resourceName);
-        jobDag.getParentsToChildren().remove(resourceName);
-        jobDag.getAllNodes().remove(resourceName);
-        try {
-          currentData
-              
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), 
jobDag.toJson());
-        } catch (Exception e) {
-          LOG.equals("Could not update DAG for job: " + resourceName);
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up job %s. Failed to remove 
node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          cfgKey));
-    }
-
-    // Delete property store information for this resource.
-    // For recurring workflow, it's OK if the node doesn't exist.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove 
node %s from Helix.",
-          resourceName, isKey));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    accessor.removeProperty(evKey);
-
-    LOG.info(String.format("Successfully cleaned up job resource %s.", 
resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      // check if property store information or resource configs exist for 
this job
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
-        lastInWorkflow = false;
-        break;
-      }
-    }
-
-    // clean up workflow-level info if this was the last in workflow
-    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == 
TargetState.DELETE)) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, 
workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. 
Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = 
getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, 
AccessOption.PERSISTENT)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. 
Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowPropStoreKey));
-      }
-      // Remove pending timer for this workflow if exists
-      if (SCHEDULED_TIMES.containsKey(workflowResource)) {
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, 
String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, 
String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> 
destination) {
-    for (Integer pId : toAdd) {
-      destination.add(pId);
-    }
-  }
-
-  private static ResourceAssignment emptyAssignment(String name, 
CurrentStateOutput currStateOutput) {
-    ResourceAssignment assignment = new ResourceAssignment(name);
-    Set<Partition> partitions = 
currStateOutput.getCurrentStateMappedPartitions(name);
-    for (Partition partition : partitions) {
-      Map<String, String> currentStateMap = 
currStateOutput.getCurrentStateMap(name, partition);
-      Map<String, String> replicaMap = Maps.newHashMap();
-      for (String instanceName : currentStateMap.keySet()) {
-        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
-      }
-      assignment.addReplicaMap(partition, replicaMap);
-    }
-    return assignment;
-  }
-
-  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) 
{
-    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-  }
-
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, 
Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> 
candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int 
p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
-  }
-
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(JobContext ctx, int pId, 
TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
-  }
-
-  private static void markAllPartitionsError(JobContext ctx, 
TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<String> instanceList, ResourceAssignment assignment, 
Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, 
SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = pId(partition.getPartitionName());
-      if (includeSet.contains(pId)) {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
-      }
-    }
-    return nonReadyPartitions;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  protected static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  protected static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
-
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
-      CurrentStateOutput currentStateOutput, WorkflowControllerDataProvider 
clusterData) {
-    // All of the heavy lifting is in the ResourceAssignment computation,
-    // so this part can just be a no-op.
-    return currentIdealState;
-  }
-
-  /**
-   * The simplest possible runnable that will trigger a run of the controller 
pipeline
-   */
-  private static class RebalanceInvoker implements Runnable {
-    private final HelixManager _manager;
-    private final String _resource;
-
-    public RebalanceInvoker(HelixManager manager, String resource) {
-      _manager = manager;
-      _resource = resource;
-    }
-
-    @Override
-    public void run() {
-      RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), 
_resource);
-    }
-  }
-}
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 5721921..8a29232 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -77,12 +77,22 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
-    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, 
instances,
-        workflowCfg, jobCfg, jobContext, partitionSet, idealStateMap);
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, 
workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
+      Map<String, IdealState> idealStateMap) {
+    return computeAssignmentAndChargeResource(currStateOutput, instances, 
workflowCfg, jobCfg,
+        jobContext, partitionSet, idealStateMap);
   }
 
   /**
@@ -175,7 +185,6 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
    * Calculate the assignment for given tasks. This assignment also charges 
resources for each task
    * and takes resource/quota availability into account while assigning.
    * @param currStateOutput
-   * @param prevAssignment
    * @param liveInstances
    * @param jobCfg
    * @param jobContext
@@ -184,9 +193,9 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
    * @return instance -> set of task partition numbers
    */
   private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, WorkflowConfig workflowCfg, JobConfig 
jobCfg,
-      JobContext jobContext, Set<Integer> taskPartitionSet, Map<String, 
IdealState> idealStateMap) {
+      CurrentStateOutput currStateOutput, Collection<String> liveInstances,
+      WorkflowConfig workflowCfg, JobConfig jobCfg, JobContext jobContext,
+      Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
 
     // Note: targeted jobs also take up capacity in quota-based scheduling
     // "Charge" resources for the tasks
@@ -250,23 +259,27 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
             // the new assignment differs from prevAssignment, release. If the 
assigned instances
             // from old and new assignments are the same, then do nothing and 
let it keep running
             // The following checks if two assignments (old and new) differ
-            Map<String, String> instanceMap = prevAssignment.getReplicaMap(new 
Partition(pName));
-            Iterator<String> itr = instanceMap.keySet().iterator();
+
             // First, check if this taskPartition has been ever assigned 
before by checking
-            // prevAssignment
-            if (itr.hasNext()) {
-              String prevInstance = itr.next();
-              if (!prevInstance.equals(instance)) {
-                // Old and new assignments are different. We need to release 
from prevInstance, and
-                // this task will be assigned to a different instance
+            // jobContext's AssignedParticipant field
+            String prevAssignedInstance = 
jobContext.getAssignedParticipant(targetPartitionId);
+            TaskPartitionState taskState = 
jobContext.getPartitionState(targetPartitionId);
+
+            if (prevAssignedInstance != null && taskState != null
+                && (taskState.equals(TaskPartitionState.INIT)
+                    || taskState.equals(TaskPartitionState.RUNNING))) {
+              // If the task is in active state and old and new assignments 
are different, we need
+              // to release from prevInstance, and this task will be assigned 
to a different
+              // instance
+              if (!prevAssignedInstance.equals(instance)) {
                 if (_assignableInstanceManager.getAssignableInstanceNames()
-                    .contains(prevInstance)) {
-                  _assignableInstanceManager.release(prevInstance, taskConfig, 
quotaType);
+                    .contains(prevAssignedInstance)) {
+                  _assignableInstanceManager.release(prevAssignedInstance, 
taskConfig, quotaType);
                 } else {
                   // This instance must be no longer live
                   LOG.warn(
                       "Task {} was reassigned from old instance: {} to new 
instance: {}. However, old instance: {} is not found in AssignableInstanceMap. 
The old instance is possibly no longer a LiveInstance. This task will not be 
released.",
-                      pName, prevAssignment, instance);
+                      pName, prevAssignedInstance, instance, 
prevAssignedInstance);
                 }
               } else {
                 // Old and new assignments are the same, so just skip 
assignment for this
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
deleted file mode 100644
index 49cd4d6..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-/**
- * A rebalancer for when a task group must be assigned according to 
partitions/states on a target
- * resource. Here, tasks are colocated according to where a resource's 
partitions are, as well as
- * (if desired) only where those partitions are in a given state.
- */
-
-/**
- * This rebalancer is deprecated, left here only for back-compatible. *
- */
-@Deprecated
-public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer {
-  private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator =
-      new FixedTargetTaskAssignmentCalculator();
-
-  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, 
JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, 
cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
-      WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, 
jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index c8989f8..10f2d82 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -64,10 +64,20 @@ public class GenericTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, 
workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, final JobContext 
jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
deleted file mode 100644
index ab290d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-
-import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.ResourceAssignment;
-
-
-/**
- * This class does an assignment based on an automatic rebalancing strategy, 
rather than requiring
- * assignment to target partitions and states of another resource
- */
-/** This rebalancer is deprecated, left here only for back-compatible. **/
-@Deprecated
-public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
-  private GenericTaskAssignmentCalculator taskAssignmentCalculator =
-      new GenericTaskAssignmentCalculator();
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, 
cache.getIdealStates());
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
-      ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
-      Set<Integer> partitionSet, WorkflowControllerDataProvider cache) {
-    return taskAssignmentCalculator
-        .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, 
jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 10a1b7c..191a2ea 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -185,8 +185,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     Set<Integer> partitionsToDrop = new TreeSet<>();
     ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, 
jobTgtState, prevAssignment,
-            liveInstances, currStateOutput, workflowCtx, jobCtx, 
partitionsToDrop, _dataProvider);
+        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, 
jobTgtState, liveInstances,
+            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, 
_dataProvider);
 
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
@@ -200,9 +200,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   private ResourceAssignment computeResourceMapping(String jobResource,
       WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, 
TargetState jobTgtState,
-      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> 
liveInstances,
-      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, 
JobContext jobCtx,
-      Set<Integer> partitionsToDropFromIs, WorkflowControllerDataProvider 
cache) {
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> 
partitionsToDropFromIs,
+      WorkflowControllerDataProvider cache) {
 
     // Used to keep track of tasks that have already been assigned to 
instances.
     // InstanceName -> Set of task partitions assigned to that instance in 
this iteration
@@ -316,10 +316,10 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, 
excludedInstances, jobResource,
-          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
-          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, 
skippedPartitions,
-          taskAssignmentCal, allPartitions, currentTime, liveInstances);
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, 
excludedInstances,
+          jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, 
workflowCtx, cache,
+          assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, 
allPartitions,
+          currentTime, liveInstances);
     }
 
     return toResourceAssignment(jobResource, paMap);
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 8cbea97..793ba6c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -56,6 +56,7 @@ public abstract class TaskAssignmentCalculator {
    * @param idealStateMap the map of resource name map to ideal state
    * @return map of instances to set of partition numbers
    */
+  @Deprecated
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
@@ -63,6 +64,23 @@ public abstract class TaskAssignmentCalculator {
       Map<String, IdealState> idealStateMap);
 
   /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param instances the instances
+   * @param jobCfg the task configuration
+   * @param jobContext the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param idealStateMap the map of resource name map to ideal state
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, Collection<String> instances, 
JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap);
+
+  /**
    * Returns the correct type for this job. Note that if the parent workflow 
has a type, then all of
    * its jobs will inherit the type from the workflow.
    * @param workflowConfig
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index 129618b..ad66be0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -75,10 +75,20 @@ public class ThreadCountBasedTaskAssignmentCalculator 
extends TaskAssignmentCalc
   }
 
   @Override
+  @Deprecated
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, 
workflowCfg,
+        workflowCtx, partitionSet, idealStateMap);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
+      Map<String, IdealState> idealStateMap) {
 
     if (jobCfg.getTargetResource() != null) {
       LOG.error(
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
new file mode 100644
index 0000000..10283d0
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestFixedTargetedTaskAssignmentCalculator.java
@@ -0,0 +1,288 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.SortedSet;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This unit test makes sure that FixedTargetedTaskAssignmentCalculator makes 
correct decision for
+ * targeted jobs
+ */
+public class TestFixedTargetedTaskAssignmentCalculator {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_PARTITION_NAME = "0";
+  private static final int PARTITION_ID = 0;
+  private static final String TARGET_RESOURCES = "TestDB";
+
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has 
been assigned
+   * before and target partition is still on the same instance and in RUNNING 
state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceRunningTask() 
{
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    CurrentStateOutput currentStateOutput = 
prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, 
slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, 
masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new 
HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, 
jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has 
been assigned
+   * before and target partition is still on the same instance and in INIT 
state,
+   * we do not make new assignment for that task.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorSameInstanceInitTask() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 1;
+    String slaveInstance1 = INSTANCE_PREFIX + 2;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = 
prepareCurrentState(TaskPartitionState.INIT,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, 
slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.INIT, 
masterInstance);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new 
HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, 
jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(masterInstance).size(),0);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  /**
+   * Test FixedTargetTaskAssignmentCalculator and make sure that if a job has 
been assigned
+   * before and target partition has moved to another instance, controller 
assign the task to
+   * new/correct instance.
+   */
+  @Test
+  public void testFixedTargetTaskAssignmentCalculatorDifferentInstance() {
+    _assignableInstanceManager = new AssignableInstanceManager();
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig,
+        new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
+    // Preparing the inputs
+    String masterInstance = INSTANCE_PREFIX + 2;
+    String slaveInstance1 = INSTANCE_PREFIX + 1;
+    String slaveInstance2 = INSTANCE_PREFIX + 3;
+    // Preparing the inputs
+    CurrentStateOutput currentStateOutput = 
prepareCurrentState(TaskPartitionState.RUNNING,
+        masterInstance, slaveInstance1, slaveInstance2);
+    List<String> instances =
+        new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, 
slaveInstance2));
+    JobConfig jobConfig = prepareJobConfig();
+    JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, 
slaveInstance1);
+    WorkflowConfig workflowConfig = prepareWorkflowConfig();
+    WorkflowContext workflowContext = prepareWorkflowContext();
+    Set<Integer> partitionSet = new 
HashSet<>(Collections.singletonList(PARTITION_ID));
+    Map<String, IdealState> idealStates =
+        prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
+    TaskAssignmentCalculator taskAssignmentCal =
+        new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
+    Map<String, SortedSet<Integer>> result =
+        taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, 
jobConfig, jobContext,
+            workflowConfig, workflowContext, partitionSet, idealStates);
+    Assert.assertEquals(result.get(slaveInstance1).size(),0);
+    Assert.assertEquals(result.get(masterInstance).size(),1);
+    Assert.assertEquals(result.get(slaveInstance2).size(),0);
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setJobId(JOB_NAME);
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    List<String> targetPartition = new ArrayList<>();
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new 
HashSet<>(Collections.singletonList("MASTER"));
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    return jobConfigBuilder.build();
+  }
+
+  private JobContext prepareJobContext(TaskPartitionState taskPartitionState, 
String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(PARTITION_ID, taskPartitionState);
+    jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_" + 
TARGET_PARTITION_NAME);
+    jobContext.setAssignedParticipant(PARTITION_ID, instance);
+    return jobContext;
+  }
+
+  private CurrentStateOutput prepareCurrentState(TaskPartitionState 
taskCurrentState,
+      String masterInstance, String slaveInstance1, String slaveInstance2) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance,
+        taskCurrentState.name());
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, 
masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, 
masterInstance, "MASTER");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, 
slaveInstance1, "SLAVE");
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, 
slaveInstance2, "SLAVE");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, 
"");
+    return currentStateOutput;
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new 
WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(),
 "0");
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), 
WORKFLOW_NAME);
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    
record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), 
jobState);
+    return new WorkflowContext(record);
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String 
instance2,
+      String instance3) {
+    Map<String, IdealState> idealStates = new HashMap<>();
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), 
"3");
+    
recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), 
"FULL_AUTO");
+    
recordDB.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(),
+        "AUTO_REBALANCE");
+    
recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "MasterSlave");
+    
recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    
recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+}

Reply via email to