http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java 
b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 1892062..d394931 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -175,6 +175,9 @@ public class JobConfig extends ResourceConfig {
   public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L;
   public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false;
 
+  // Cache TaskConfig objects for targeted jobs' tasks to reduce object 
creation/GC overload
+  private Map<String, TaskConfig> _targetedTaskConfigMap = new HashMap<>();
+
   public JobConfig(HelixProperty property) {
     super(property.getRecord());
   }
@@ -263,7 +266,9 @@ public class JobConfig extends ResourceConfig {
         String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
     getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
         rebalanceRunningTask);
-    putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType);
+    if (quotaType != null) {
+      putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType);
+    }
   }
 
   public String getWorkflow() {
@@ -355,8 +360,16 @@ public class JobConfig extends ResourceConfig {
         DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE);
   }
 
+  /**
+   * Returns taskConfigMap. If it's targeted, then return a cached 
targetedTaskConfigMap.
+   * @return
+   */
   public Map<String, TaskConfig> getTaskConfigMap() {
-    Map<String, TaskConfig> taskConfigMap = new HashMap<String, TaskConfig>();
+    String targetResource = 
getSimpleConfig(JobConfigProperty.TargetResource.name());
+    if (targetResource != null) {
+      return _targetedTaskConfigMap;
+    }
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
     for (Map.Entry<String, Map<String, String>> entry : 
getMapConfigs().entrySet()) {
       taskConfigMap.put(entry.getKey(),
           new TaskConfig(null, entry.getValue(), entry.getKey(), null));
@@ -364,10 +377,33 @@ public class JobConfig extends ResourceConfig {
     return taskConfigMap;
   }
 
+  /**
+   * If the job is targeted, try to get it from the cached 
targetedTaskConfigMap first. If not,
+   * create a TaskConfig on the fly.
+   * @param id pName for targeted tasks
+   * @return a TaskConfig object
+   */
   public TaskConfig getTaskConfig(String id) {
+    String targetResource = 
getSimpleConfig(JobConfigProperty.TargetResource.name());
+    if (targetResource != null) {
+      // This is a targeted task. For targeted tasks, id is pName
+      if (!_targetedTaskConfigMap.containsKey(id)) {
+        return new TaskConfig(null, null, id, null);
+      }
+      return _targetedTaskConfigMap.get(id);
+    }
     return new TaskConfig(null, getMapConfig(id), id, null);
   }
 
+  /**
+   * When a targeted task is assigned for the first time, cache it in 
JobConfig so that it could be
+   * retrieved later for release.
+   * @param pName a concatenation of job name + "_" + task partition number
+   */
+  public void setTaskConfig(String pName, TaskConfig taskConfig) {
+    _targetedTaskConfigMap.put(pName, taskConfig);
+  }
+
   public Map<String, String> getResourceConfigMap() {
     return getSimpleConfigs();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 11abb25..c49a365 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -40,6 +40,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,17 +52,14 @@ import com.google.common.collect.ImmutableMap;
  */
 public class JobRebalancer extends TaskRebalancer {
   private static final Logger LOG = 
LoggerFactory.getLogger(JobRebalancer.class);
-  private static TaskAssignmentCalculator _fixTaskAssignmentCal =
-      new FixedTargetTaskAssignmentCalculator();
-  private static TaskAssignmentCalculator _genericTaskAssignmentCal =
-      new GenericTaskAssignmentCalculator();
+  private static TaskAssignmentCalculator _fixTaskAssignmentCal;
+  private static TaskAssignmentCalculator _threadCountBasedTaskAssignmentCal;
 
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   @Override
-  public ResourceAssignment computeBestPossiblePartitionState(
-      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
-      CurrentStateOutput currStateOutput) {
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache 
clusterData,
+      IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput) {
     final String jobName = resource.getResourceName();
     LOG.debug("Computer Best Partition for job: " + jobName);
 
@@ -93,7 +91,8 @@ public class JobRebalancer extends TaskRebalancer {
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
-    // Stop current run of the job if workflow or job is already in final 
state (failed or completed)
+    // Stop current run of the job if workflow or job is already in final 
state (failed or
+    // completed)
     TaskState workflowState = workflowCtx.getWorkflowState();
     TaskState jobState = workflowCtx.getJobState(jobName);
     // The job is already in a final state (completed/failed).
@@ -114,7 +113,7 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (!TaskUtil.isJobStarted(jobName, workflowCtx) && 
!isJobReadyToSchedule(jobName, workflowCfg,
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        clusterData.getJobConfigMap())) {
+        clusterData.getJobConfigMap(), clusterData)) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
@@ -142,9 +141,9 @@ public class JobRebalancer extends TaskRebalancer {
     // is stored in zk.
     // Fetch the previous resource assignment from the property store. This is 
required because of
     // HELIX-230.
-    Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
-        ? clusterData.getEnabledLiveInstances()
-        : 
clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+    Set<String> liveInstances =
+        jobCfg.getInstanceGroupTag() == null ? 
clusterData.getEnabledLiveInstances()
+            : 
clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job!");
@@ -197,13 +196,13 @@ public class JobRebalancer extends TaskRebalancer {
 
     // Update Workflow and Job context in data cache and ZK.
     clusterData.updateJobContext(jobName, jobCtx, 
_manager.getHelixDataAccessor());
-    clusterData
-        .updateWorkflowContext(workflowResource, workflowCtx, 
_manager.getHelixDataAccessor());
+    clusterData.updateWorkflowContext(workflowResource, workflowCtx,
+        _manager.getHelixDataAccessor());
 
     setPrevResourceAssignment(jobName, newAssignment);
 
-    LOG.debug("Job " + jobName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
+    LOG.debug("Job " + jobName + " new assignment "
+        + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
     return newAssignment;
   }
 
@@ -213,7 +212,6 @@ public class JobRebalancer extends TaskRebalancer {
       CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, 
JobContext jobCtx,
       Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) {
 
-
     // Used to keep track of tasks that have already been assigned to 
instances.
     Set<Integer> assignedPartitions = new HashSet<>();
 
@@ -223,12 +221,13 @@ public class JobRebalancer extends TaskRebalancer {
     // Keeps a mapping of (partition) -> (instance, state)
     Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
 
-    Set<String> excludedInstances = getExcludedInstances(jobResource, 
workflowConfig, cache);
+    Set<String> excludedInstances =
+        getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);
 
     // Process all the current assignments of tasks.
-    TaskAssignmentCalculator taskAssignmentCal = 
getAssignmentCalulator(jobCfg);
-    Set<Integer> allPartitions = taskAssignmentCal
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, 
cache.getIdealStates());
+    TaskAssignmentCalculator taskAssignmentCal = 
getAssignmentCalculator(jobCfg, cache);
+    Set<Integer> allPartitions = 
taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
+        workflowConfig, workflowCtx, cache.getIdealStates());
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
@@ -236,7 +235,7 @@ public class JobRebalancer extends TaskRebalancer {
           "Empty task partition mapping for job " + jobResource + ", marked 
the job as FAILED!";
       LOG.info(failureMsg);
       jobCtx.setInfo(failureMsg);
-      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap());
+      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap(), cache);
       markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
       return new ResourceAssignment(jobResource);
     }
@@ -247,23 +246,23 @@ public class JobRebalancer extends TaskRebalancer {
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "All partitions: " + allPartitions + " taskAssignment: " + 
prevInstanceToTaskAssignments
-              + " excludedInstances: " + excludedInstances);
+      LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
+          + prevInstanceToTaskAssignments + " excludedInstances: " + 
excludedInstances);
     }
 
+    // Release resource for tasks in terminal state
     updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, 
jobState,
-        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions);
+        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions, cache);
 
     addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
     if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > 
jobCfg.getFailureThreshold()
         || (jobCfg.getTargetResource() != null
-        && cache.getIdealState(jobCfg.getTargetResource()) != null && !cache
-        .getIdealState(jobCfg.getTargetResource()).isEnabled())) {
+        && cache.getIdealState(jobCfg.getTargetResource()) != null
+        && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
       if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
-        failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap());
+        failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap(), cache);
         return buildEmptyAssignment(jobResource, currStateOutput);
       }
       workflowCtx.setJobState(jobResource, TaskState.FAILING);
@@ -286,12 +285,13 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, 
currStateOutput)) {
-      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap());
+      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap(), cache);
       return buildEmptyAssignment(jobResource, currStateOutput);
     }
 
     if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
-      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, 
cache.getJobConfigMap());
+      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, 
cache.getJobConfigMap(),
+          cache);
       _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
           jobCtx.getFinishTime() - jobCtx.getStartTime());
       _rebalanceScheduler.removeScheduledRebalance(jobResource);
@@ -299,7 +299,8 @@ public class JobRebalancer extends TaskRebalancer {
       return buildEmptyAssignment(jobResource, currStateOutput);
     }
 
-    // If job is being timed out and no task is running (for whatever reason), 
idealState can be deleted and all tasks
+    // If job is being timed out and no task is running (for whatever reason), 
idealState can be
+    // deleted and all tasks
     // can be dropped(note that Helix doesn't track whether the drop is 
success or not).
     if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, 
currStateOutput)) {
       handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
@@ -341,8 +342,8 @@ public class JobRebalancer extends TaskRebalancer {
       String instance = jobContext.getAssignedParticipant(pId);
       Message pendingMessage = currentStateOutput.getPendingState(jobResource, 
partition, instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to 
say the job finished
-      if (state == TaskPartitionState.RUNNING || (state == 
TaskPartitionState.INIT
-          && pendingMessage != null)) {
+      if (state == TaskPartitionState.RUNNING
+          || (state == TaskPartitionState.INIT && pendingMessage != null)) {
         return false;
       }
     }
@@ -351,28 +352,25 @@ public class JobRebalancer extends TaskRebalancer {
 
   /**
    * Get the last task assignment for a given job
-   *
    * @param resourceName the name of the job
-   *
    * @return {@link ResourceAssignment} instance, or null if no assignment is 
available
    */
   private ResourceAssignment getPrevResourceAssignment(String resourceName) {
-    ZNRecord r = _manager.getHelixPropertyStore()
-        .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
+    ZNRecord r = _manager.getHelixPropertyStore().get(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
+        null, AccessOption.PERSISTENT);
     return r != null ? new ResourceAssignment(r) : null;
   }
 
   /**
    * Set the last task assignment for a given job
-   *
    * @param resourceName the name of the job
-   * @param ra           {@link ResourceAssignment} containing the task 
assignment
+   * @param ra {@link ResourceAssignment} containing the task assignment
    */
   private void setPrevResourceAssignment(String resourceName, 
ResourceAssignment ra) {
-    _manager.getHelixPropertyStore()
-        .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
-            ra.getRecord(), AccessOption.PERSISTENT);
+    _manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, PREV_RA_NODE),
+        ra.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
@@ -389,7 +387,8 @@ public class JobRebalancer extends TaskRebalancer {
         if (!isTaskGivenup(ctx, cfg, pId)) {
           return false;
         }
-        // If the task is given up, there's still chance the job has completed 
because of job failure threshold.
+        // If the task is given up, there's still chance the job has completed 
because of job
+        // failure threshold.
         numOfGivenUpTasks++;
       }
     }
@@ -398,9 +397,8 @@ public class JobRebalancer extends TaskRebalancer {
 
   /**
    * @param liveInstances
-   * @param prevAssignment    task partition -> (instance -> state)
+   * @param prevAssignment task partition -> (instance -> state)
    * @param allTaskPartitions all task partitionIds
-   *
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
   private static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
@@ -426,7 +424,24 @@ public class JobRebalancer extends TaskRebalancer {
     return result;
   }
 
-  private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) 
{
-    return TaskUtil.isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : 
_fixTaskAssignmentCal;
+  /**
+   * If the job is a targeted job, use fixedTaskAssignmentCalculator. 
Otherwise, use
+   * threadCountBasedTaskAssignmentCalculator. Both calculators support 
quota-based scheduling.
+   * @param jobConfig
+   * @param cache
+   * @return
+   */
+  private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig,
+      ClusterDataCache cache) {
+    AssignableInstanceManager assignableInstanceManager = 
cache.getAssignableInstanceManager();
+    if (_threadCountBasedTaskAssignmentCal == null) {
+      _threadCountBasedTaskAssignmentCal = new 
ThreadCountBasedTaskAssignmentCalculator(
+          new ThreadCountBasedTaskAssigner(), assignableInstanceManager);
+    }
+    if (_fixTaskAssignmentCal == null) {
+      _fixTaskAssignmentCal = new 
FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
+    }
+    return TaskUtil.isGenericTaskJob(jobConfig) ? 
_threadCountBasedTaskAssignmentCal
+        : _fixTaskAssignmentCal;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 66b961b..8286257 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -11,7 +11,7 @@ import java.util.SortedSet;
 
 public abstract class TaskAssignmentCalculator {
   /**
-   * Get all the partitions that should be created by this task
+   * Get all the partitions/tasks that belong to this job.
    *
    * @param jobCfg the task configuration
    * @param jobCtx the task context
@@ -43,4 +43,4 @@ public abstract class TaskAssignmentCalculator {
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
       Map<String, IdealState> idealStateMap);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1d29368..dae0da6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -52,14 +52,12 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
     _manager = manager;
   }
 
-
   @Override
   public abstract ResourceAssignment 
computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput 
currStateOutput);
 
   /**
    * Checks if the workflow has been stopped.
-   *
    * @param ctx Workflow context containing task states
    * @param cfg Workflow config containing set of tasks
    * @return returns true if all tasks are {@link TaskState#STOPPED}, false 
otherwise.
@@ -67,8 +65,8 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
   protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) 
{
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
-      if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || 
jobState
-          .equals(TaskState.STOPPING))) {
+      if (jobState != null
+          && (jobState.equals(TaskState.IN_PROGRESS) || 
jobState.equals(TaskState.STOPPING))) {
         return false;
       }
     }
@@ -92,14 +90,14 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
 
   /**
    * Check all the dependencies of a job to determine whether the job is ready 
to be scheduled.
-   *
    * @param job
    * @param workflowCfg
    * @param workflowCtx
    * @return
    */
   protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
-      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap) {
+      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap,
+      ClusterDataCache clusterDataCache) {
     int notStartedCount = 0;
     int failedOrTimeoutCount = 0;
     int incompleteParentCount = 0;
@@ -118,8 +116,8 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
     // If there is any parent job not started, this job should not be scheduled
     if (notStartedCount > 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, notStartedParent(s)=%d.", 
job, notStartedCount));
+        LOG.debug(String.format("Job %s is not ready to start, 
notStartedParent(s)=%d.", job,
+            notStartedCount));
       }
       return false;
     }
@@ -132,10 +130,10 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
       return false;
     }
     if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
-      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap);
+      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, 
clusterDataCache);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, failedCount(s)=%d.", job, 
failedOrTimeoutCount));
+        LOG.debug(String.format("Job %s is not ready to start, 
failedCount(s)=%d.", job,
+            failedOrTimeoutCount));
       }
       return false;
     }
@@ -166,7 +164,6 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
 
   /**
    * Check if a workflow is ready to schedule.
-   *
    * @param workflowCfg the workflow to check
    * @return true if the workflow is ready for schedule, false if not ready
    */
@@ -177,12 +174,10 @@ public abstract class TaskRebalancer extends 
AbstractTaskDispatcher
   }
 
   @Override
-  public IdealState computeNewIdealState(String resourceName,
-      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
-      ClusterDataCache clusterData) {
+  public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     // All of the heavy lifting is in the ResourceAssignment computation,
     // so this part can just be a no-op.
     return currentIdealState;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index b3a7f29..68a9d4a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -59,11 +59,10 @@ public class TaskUtil {
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} 
object.
    * This method is internal API, please use the corresponding one in 
TaskDriver.getJobConfig();
-   *
-   * @param accessor    Accessor to access Helix configs
+   * @param accessor Accessor to access Helix configs
    * @param job The name of the job resource
    * @return A {@link JobConfig} object if Helix contains valid configurations 
for the job, null
-   * otherwise.
+   *         otherwise.
    */
   protected static JobConfig getJobConfig(HelixDataAccessor accessor, String 
job) {
     HelixProperty jobResourceConfig = getResourceConfig(accessor, job);
@@ -76,11 +75,10 @@ public class TaskUtil {
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} 
object.
    * This method is internal API, please use the corresponding one in 
TaskDriver.getJobConfig();
-   *
-   * @param manager     HelixManager object used to connect to Helix.
+   * @param manager HelixManager object used to connect to Helix.
    * @param job The name of the job resource.
    * @return A {@link JobConfig} object if Helix contains valid configurations 
for the job, null
-   * otherwise.
+   *         otherwise.
    */
   protected static JobConfig getJobConfig(HelixManager manager, String job) {
     return getJobConfig(manager.getHelixDataAccessor(), job);
@@ -88,11 +86,9 @@ public class TaskUtil {
 
   /**
    * Set the job config
-   *
-   * @param accessor  Accessor to Helix configs
-   * @param job       The job name
+   * @param accessor Accessor to Helix configs
+   * @param job The job name
    * @param jobConfig The job config to be set
-   *
    * @return True if set successfully, otherwise false
    */
   protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
@@ -102,10 +98,8 @@ public class TaskUtil {
 
   /**
    * Remove a job config.
-   *
    * @param accessor
    * @param job
-   *
    * @return
    */
   protected static boolean removeJobConfig(HelixDataAccessor accessor, String 
job) {
@@ -114,12 +108,12 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link 
WorkflowConfig} object.
-   * This method is internal API, please use the corresponding one in 
TaskDriver.getWorkflowConfig();
-   *
-   * @param accessor  Accessor to access Helix configs
+   * This method is internal API, please use the corresponding one in
+   * TaskDriver.getWorkflowConfig();
+   * @param accessor Accessor to access Helix configs
    * @param workflow The name of the workflow.
    * @return A {@link WorkflowConfig} object if Helix contains valid 
configurations for the
-   * workflow, null otherwise.
+   *         workflow, null otherwise.
    */
   protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor 
accessor, String workflow) {
     HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
@@ -132,12 +126,12 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link 
WorkflowConfig} object.
-   * This method is internal API, please use the corresponding one in 
TaskDriver.getWorkflowConfig();
-   *
-   * @param manager          Helix manager object used to connect to Helix.
+   * This method is internal API, please use the corresponding one in
+   * TaskDriver.getWorkflowConfig();
+   * @param manager Helix manager object used to connect to Helix.
    * @param workflow The name of the workflow resource.
    * @return A {@link WorkflowConfig} object if Helix contains valid 
configurations for the
-   * workflow, null otherwise.
+   *         workflow, null otherwise.
    */
   protected static WorkflowConfig getWorkflowConfig(HelixManager manager, 
String workflow) {
     return getWorkflowConfig(manager.getHelixDataAccessor(), workflow);
@@ -145,10 +139,10 @@ public class TaskUtil {
 
   /**
    * Set the workflow config
-   * @param accessor        Accessor to Helix configs
-   * @param workflow        The workflow name
-   * @param workflowConfig  The workflow config to be set
-   * @return                True if set successfully, otherwise false
+   * @param accessor Accessor to Helix configs
+   * @param workflow The workflow name
+   * @param workflowConfig The workflow config to be set
+   * @return True if set successfully, otherwise false
    */
   protected static boolean setWorkflowConfig(HelixDataAccessor accessor, 
String workflow,
       WorkflowConfig workflowConfig) {
@@ -167,9 +161,8 @@ public class TaskUtil {
 
   /**
    * Get a Helix configuration scope at a resource (i.e. job and workflow) 
level
-   *
    * @param clusterName the cluster containing the resource
-   * @param resource    the resource name
+   * @param resource the resource name
    * @return instantiated {@link HelixConfigScope}
    */
   protected static HelixConfigScope getResourceConfigScope(String clusterName, 
String resource) {
@@ -180,24 +173,22 @@ public class TaskUtil {
   /**
    * Get the runtime context of a single job.
    * This method is internal API, please use TaskDriver.getJobContext();
-   *
    * @param propertyStore Property store for the cluster
-   * @param jobResource   The name of the job
+   * @param jobResource The name of the job
    * @return the {@link JobContext}, or null if none is available
    */
   protected static JobContext getJobContext(HelixPropertyStore<ZNRecord> 
propertyStore,
       String jobResource) {
-    ZNRecord r = propertyStore
-        .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
jobResource, CONTEXT_NODE),
-            null, AccessOption.PERSISTENT);
+    ZNRecord r = propertyStore.get(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
jobResource, CONTEXT_NODE), null,
+        AccessOption.PERSISTENT);
     return r != null ? new JobContext(r) : null;
   }
 
   /**
    * Get the runtime context of a single job.
    * This method is internal API, please use TaskDriver.getJobContext();
-   *
-   * @param manager     a connection to Helix
+   * @param manager a connection to Helix
    * @param jobResource the name of the job
    * @return the {@link JobContext}, or null if none is available
    */
@@ -208,24 +199,22 @@ public class TaskUtil {
   /**
    * Set the runtime context of a single job
    * This method is internal API;
-   *
-   * @param manager     a connection to Helix
+   * @param manager a connection to Helix
    * @param jobResource the name of the job
-   * @param ctx         the up-to-date {@link JobContext} for the job
+   * @param ctx the up-to-date {@link JobContext} for the job
    */
   protected static void setJobContext(HelixManager manager, String 
jobResource, JobContext ctx) {
-    manager.getHelixPropertyStore()
-        .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
jobResource, CONTEXT_NODE),
-            ctx.getRecord(), AccessOption.PERSISTENT);
+    manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
jobResource, CONTEXT_NODE),
+        ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
    * Remove the runtime context of a single job.
    * This method is internal API.
-   *
-   * @param manager     A connection to Helix
+   * @param manager A connection to Helix
    * @param jobResource The name of the job
-   * @return            True if remove success, otherwise false
+   * @return True if remove success, otherwise false
    */
   protected static boolean removeJobContext(HelixManager manager, String 
jobResource) {
     return removeJobContext(manager.getHelixPropertyStore(), jobResource);
@@ -234,10 +223,9 @@ public class TaskUtil {
   /**
    * Remove the runtime context of a single job.
    * This method is internal API.
-   *
    * @param propertyStore Property store for the cluster
-   * @param job   The name of the job
-   * @return              True if remove success, otherwise false
+   * @param job The name of the job
+   * @return True if remove success, otherwise false
    */
   protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> 
propertyStore,
       String job) {
@@ -246,25 +234,25 @@ public class TaskUtil {
 
   /**
    * Get the runtime context of a single workflow.
-   * This method is internal API, please use the corresponding one in 
TaskDriver.getWorkflowContext();
-   *
-   * @param propertyStore    Property store of the cluster
+   * This method is internal API, please use the corresponding one in
+   * TaskDriver.getWorkflowContext();
+   * @param propertyStore Property store of the cluster
    * @param workflow The name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
   protected static WorkflowContext 
getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
       String workflow) {
     ZNRecord r = propertyStore.get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, 
CONTEXT_NODE),
-        null, AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, 
CONTEXT_NODE), null,
+        AccessOption.PERSISTENT);
     return r != null ? new WorkflowContext(r) : null;
   }
 
   /**
    * Get the runtime context of a single workflow.
-   * This method is internal API, please use the corresponding one in 
TaskDriver.getWorkflowContext();
-   *
-   * @param manager          a connection to Helix
+   * This method is internal API, please use the corresponding one in
+   * TaskDriver.getWorkflowContext();
+   * @param manager a connection to Helix
    * @param workflow the name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
@@ -274,10 +262,9 @@ public class TaskUtil {
 
   /**
    * Set the runtime context of a single workflow
-   *
-   * @param manager          a connection to Helix
+   * @param manager a connection to Helix
    * @param workflow the name of the workflow
-   * @param workflowContext              the up-to-date {@link 
WorkflowContext} for the workflow
+   * @param workflowContext the up-to-date {@link WorkflowContext} for the 
workflow
    */
   protected static void setWorkflowContext(HelixManager manager, String 
workflow,
       WorkflowContext workflowContext) {
@@ -289,10 +276,9 @@ public class TaskUtil {
   /**
    * Remove the runtime context of a single workflow.
    * This method is internal API.
-   *
-   * @param manager     A connection to Helix
+   * @param manager A connection to Helix
    * @param workflow The name of the workflow
-   * @return            True if remove success, otherwise false
+   * @return True if remove success, otherwise false
    */
   protected static boolean removeWorkflowContext(HelixManager manager, String 
workflow) {
     return removeWorkflowContext(manager.getHelixPropertyStore(), workflow);
@@ -301,10 +287,9 @@ public class TaskUtil {
   /**
    * Remove the runtime context of a single workflow.
    * This method is internal API.
-   *
-   * @param propertyStore      Property store for the cluster
-   * @param workflow   The name of the workflow
-   * @return                   True if remove success, otherwise false
+   * @param propertyStore Property store for the cluster
+   * @param workflow The name of the workflow
+   * @return True if remove success, otherwise false
    */
   protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> 
propertyStore,
       String workflow) {
@@ -313,49 +298,46 @@ public class TaskUtil {
 
   /**
    * Intialize the user content store znode setup
-   * @param propertyStore       zookeeper property store
+   * @param propertyStore zookeeper property store
    * @param workflowJobResource the name of workflow or job
-   * @param record              the initial data
+   * @param record the initial data
    */
-  protected static void createUserContent(HelixPropertyStore propertyStore, 
String workflowJobResource,
-      ZNRecord record) {
-    propertyStore.create(Joiner.on("/")
-        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource,
-            TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT);
+  protected static void createUserContent(HelixPropertyStore propertyStore,
+      String workflowJobResource, ZNRecord record) {
+    
propertyStore.create(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+        workflowJobResource, TaskUtil.USER_CONTENT_NODE), record, 
AccessOption.PERSISTENT);
   }
 
   /**
    * Get user defined workflow or job level key-value pair data
-   *
-   * @param manager             a connection to Helix
+   * @param manager a connection to Helix
    * @param workflowJobResource the name of workflow
-   * @param key                 the key of key-value pair
-   *
+   * @param key the key of key-value pair
    * @return null if there is no such pair, otherwise return a String
    */
   protected static String getWorkflowJobUserContent(HelixManager manager,
       String workflowJobResource, String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, 
USER_CONTENT_NODE),
-        null, AccessOption.PERSISTENT);
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, 
USER_CONTENT_NODE), null,
+        AccessOption.PERSISTENT);
     return r != null ? r.getSimpleField(key) : null;
   }
 
   /**
    * Add an user defined key-value pair data to workflow or job level
-   *
-   * @param manager             a connection to Helix
+   * @param manager a connection to Helix
    * @param workflowJobResource the name of workflow or job
-   * @param key                 the key of key-value pair
-   * @param value               the value of key-value pair
+   * @param key the key of key-value pair
+   * @param value the value of key-value pair
    */
   protected static void addWorkflowJobUserContent(final HelixManager manager,
       String workflowJobResource, final String key, final String value) {
-    String path = Joiner.on("/")
-        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, 
USER_CONTENT_NODE);
+    String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
workflowJobResource,
+        USER_CONTENT_NODE);
 
     manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord znRecord) {
+      @Override
+      public ZNRecord update(ZNRecord znRecord) {
         znRecord.setSimpleField(key, value);
         return znRecord;
       }
@@ -364,32 +346,27 @@ public class TaskUtil {
 
   /**
    * Get user defined task level key-value pair data
-   *
-   * @param manager      a connection to Helix
-   * @param job  the name of job
+   * @param manager a connection to Helix
+   * @param job the name of job
    * @param task the name of the task
-   * @param key          the key of key-value pair
-   *
+   * @param key the key of key-value pair
    * @return null if there is no such pair, otherwise return a String
    */
-  protected static String getTaskUserContent(HelixManager manager, String job,
-      String task, String key) {
+  protected static String getTaskUserContent(HelixManager manager, String job, 
String task,
+      String key) {
     ZNRecord r = manager.getHelixPropertyStore().get(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, 
USER_CONTENT_NODE), null,
         AccessOption.PERSISTENT);
-    return r != null ? (r.getMapField(task) != null
-        ? r.getMapField(task).get(key)
-        : null) : null;
+    return r != null ? (r.getMapField(task) != null ? 
r.getMapField(task).get(key) : null) : null;
   }
 
   /**
    * Add an user defined key-value pair data to task level
-   *
-   * @param manager       a connection to Helix
-   * @param job   the name of job
-   * @param task  the name of task
-   * @param key           the key of key-value pair
-   * @param value         the value of key-value pair
+   * @param manager a connection to Helix
+   * @param job the name of job
+   * @param task the name of task
+   * @param key the key of key-value pair
+   * @param value the value of key-value pair
    */
   protected static void addTaskUserContent(final HelixManager manager, String 
job,
       final String task, final String key, final String value) {
@@ -397,7 +374,8 @@ public class TaskUtil {
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, 
USER_CONTENT_NODE);
 
     manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord znRecord) {
+      @Override
+      public ZNRecord update(ZNRecord znRecord) {
         if (znRecord.getMapField(task) == null) {
           znRecord.setMapField(task, new HashMap<String, String>());
         }
@@ -406,9 +384,9 @@ public class TaskUtil {
       }
     }, AccessOption.PERSISTENT);
   }
+
   /**
    * Get a workflow-qualified job name for a single-job workflow
-   *
    * @param singleJobWorkflow the name of the single-job workflow
    * @return The namespaced job name, which is just 
singleJobWorkflow_singleJobWorkflow
    */
@@ -418,9 +396,8 @@ public class TaskUtil {
 
   /**
    * Get a workflow-qualified job name for a job in that workflow
-   *
    * @param workflow the name of the workflow
-   * @param jobName          the un-namespaced name of the job
+   * @param jobName the un-namespaced name of the job
    * @return The namespaced job name, which is just workflowResource_jobName
    */
   public static String getNamespacedJobName(String workflow, String jobName) {
@@ -429,9 +406,8 @@ public class TaskUtil {
 
   /**
    * Remove the workflow namespace from the job name
-   *
    * @param workflow the name of the workflow that owns the job
-   * @param jobName          the namespaced job name
+   * @param jobName the namespaced job name
    * @return the denamespaced job name, or the same job name if it is already 
denamespaced
    */
   public static String getDenamespacedJobName(String workflow, String jobName) 
{
@@ -445,7 +421,6 @@ public class TaskUtil {
 
   /**
    * Serialize a map of job-level configurations as a single string
-   *
    * @param commandConfig map of job config key to config value
    * @return serialized string
    */
@@ -464,7 +439,6 @@ public class TaskUtil {
 
   /**
    * Deserialize a single string into a map of job-level configurations
-   *
    * @param commandConfig the serialized job config map
    * @return a map of job config key to config value
    */
@@ -485,7 +459,6 @@ public class TaskUtil {
 
   /**
    * Extracts the partition id from the given partition name.
-   *
    * @param pName
    * @return
    */
@@ -504,27 +477,27 @@ public class TaskUtil {
   }
 
   @Deprecated
-  public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor 
accessor, String workflow) {
+  public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor 
accessor,
+      String workflow) {
     return accessor.keyBuilder().resourceConfig(workflow);
   }
 
   /**
    * Cleans up IdealState and external view associated with a job.
-   *
    * @param accessor
    * @param job
-   * @return  True if remove success, otherwise false
+   * @return True if remove success, otherwise false
    */
-  protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor 
accessor, String job) {
+  protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor 
accessor,
+      String job) {
     return cleanupIdealStateExtView(accessor, job);
   }
 
   /**
    * Cleans up IdealState and external view associated with a workflow.
-   *
    * @param accessor
    * @param workflow
-   * @return  True if remove success, otherwise false
+   * @return True if remove success, otherwise false
    */
   protected static boolean cleanupWorkflowIdealStateExtView(final 
HelixDataAccessor accessor,
       String workflow) {
@@ -565,56 +538,49 @@ public class TaskUtil {
    * Remove a workflow and all jobs for the workflow. This removes the 
workflow config, idealstate,
    * externalview and workflow contexts associated with this workflow, and all 
jobs information,
    * including their configs, context, IS and EV.
-   *
    * @param accessor
    * @param propertyStore
    * @param workflow the workflow name.
-   * @param jobs     all job names in this workflow.
-   *
-   * @return  True if remove success, otherwise false
+   * @param jobs all job names in this workflow.
+   * @return True if remove success, otherwise false
    */
-  protected static boolean removeWorkflow(final HelixDataAccessor accessor, 
final HelixPropertyStore propertyStore,
-      String workflow, Set<String> jobs) {
-    boolean success = true;
-
+  protected static boolean removeWorkflow(final HelixDataAccessor accessor,
+      final HelixPropertyStore propertyStore, String workflow, Set<String> 
jobs) {
     // clean up all jobs
     for (String job : jobs) {
       if (!removeJob(accessor, propertyStore, job)) {
-        success = false;
+        return false;
       }
     }
 
     if (!removeWorkflowConfig(accessor, workflow)) {
       LOG.warn(
           String.format("Error occurred while trying to remove workflow config 
for %s.", workflow));
-      success = false;
+      return false;
     }
     if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
-      LOG.warn(String
-          .format("Error occurred while trying to remove workflow 
idealstate/externalview for %s.",
-              workflow));
-      success = false;
+      LOG.warn(String.format(
+          "Error occurred while trying to remove workflow 
idealstate/externalview for %s.",
+          workflow));
+      return false;
     }
     if (!removeWorkflowContext(propertyStore, workflow)) {
-      LOG.warn(String
-          .format("Error occurred while trying to remove workflow context for 
%s.", workflow));
-      success = false;
+      LOG.warn(String.format("Error occurred while trying to remove workflow 
context for %s.",
+          workflow));
+      return false;
     }
-
-    return success;
+    return true;
   }
 
   /**
    * Remove a set of jobs from a workflow. This removes the config, context, 
IS and EV associated
    * with each individual job, and removes all the jobs from the 
WorkflowConfig, and job states from
    * WorkflowContext.
-   *
    * @param dataAccessor
    * @param propertyStore
    * @param jobs
    * @param workflow
    * @param maintainDependency
-   *
    * @return True if remove success, otherwise false
    */
   protected static boolean removeJobsFromWorkflow(final HelixDataAccessor 
dataAccessor,
@@ -627,9 +593,8 @@ public class TaskUtil {
       success = false;
     }
     if (!removeJobsState(propertyStore, workflow, jobs)) {
-      LOG.warn(
-          "Error occurred while trying to remove jobs states from workflow + " 
+ workflow + " jobs "
-              + jobs);
+      LOG.warn("Error occurred while trying to remove jobs states from 
workflow + " + workflow
+          + " jobs " + jobs);
       success = false;
     }
     for (String job : jobs) {
@@ -643,12 +608,10 @@ public class TaskUtil {
 
   /**
    * Return all jobs that are COMPLETED and passes its expiry time.
-   *
    * @param dataAccessor
    * @param propertyStore
    * @param workflowConfig
    * @param workflowContext
-   *
    * @return
    */
   protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
@@ -685,8 +648,8 @@ public class TaskUtil {
       success = false;
     }
     if (!cleanupJobIdealStateExtView(accessor, job)) {
-      LOG.warn(String
-          .format("Error occurred while trying to remove job 
idealstate/externalview for %s.", job));
+      LOG.warn(String.format(
+          "Error occurred while trying to remove job idealstate/externalview 
for %s.", job));
       success = false;
     }
     if (!removeJobContext(propertyStore, job)) {
@@ -699,8 +662,8 @@ public class TaskUtil {
 
   /** Remove the job name from the DAG from the queue configuration */
   // Job name should be namespaced job name here.
-  protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, 
final String workflow,
-      final Set<String> jobsToRemove, final boolean maintainDependency) {
+  protected static boolean removeJobsFromDag(final HelixDataAccessor accessor,
+      final String workflow, final Set<String> jobsToRemove, final boolean 
maintainDependency) {
     // Now atomically clear the DAG
     DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
       @Override
@@ -716,8 +679,8 @@ public class TaskUtil {
             jobDag.removeNode(job, maintainDependency);
           }
           try {
-            currentData
-                
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), 
jobDag.toJson());
+            
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
+                jobDag.toJson());
           } catch (IOException e) {
             throw new IllegalArgumentException(e);
           }
@@ -749,7 +712,8 @@ public class TaskUtil {
     }
 
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
           WorkflowContext workflowContext = new WorkflowContext(currentData);
           workflowContext.removeJobStates(jobs);
@@ -782,7 +746,6 @@ public class TaskUtil {
 
   /**
    * Remove workflow or job config.
-   *
    * @param accessor
    * @param workflowJobResource the workflow or job name
    */
@@ -803,10 +766,10 @@ public class TaskUtil {
 
   /**
    * Set the resource config
-   * @param accessor        Accessor to Helix configs
-   * @param resource        The resource name
-   * @param resourceConfig  The resource config to be set
-   * @return                True if set successfully, otherwise false
+   * @param accessor Accessor to Helix configs
+   * @param resource The resource name
+   * @param resourceConfig The resource config to be set
+   * @return True if set successfully, otherwise false
    */
   private static boolean setResourceConfig(HelixDataAccessor accessor, String 
resource,
       ResourceConfig resourceConfig) {
@@ -830,16 +793,19 @@ public class TaskUtil {
     return nonReadyPartitions;
   }
 
+  /**
+   * Returns whether if a given job is a generic job (not a targeted job).
+   * @param jobConfig
+   * @return
+   */
   public static boolean isGenericTaskJob(JobConfig jobConfig) {
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    return taskConfigMap != null && !taskConfigMap.isEmpty();
+    // Targeted jobs may have TaskConfigs, so we check whether the target 
resource is set
+    return jobConfig.getTargetResource() == null || 
jobConfig.getTargetResource().equals("");
   }
 
   /**
    * Check whether tasks are just started or still running
-   *
    * @param jobContext The job context
-   *
    * @return False if still tasks not in final state. Otherwise return true
    */
   public static boolean checkJobStopped(JobContext jobContext) {
@@ -852,10 +818,8 @@ public class TaskUtil {
     return true;
   }
 
-
   /**
    * Count the number of jobs in a workflow that are not in final state.
-   *
    * @param workflowCfg
    * @param workflowCtx
    * @return
@@ -876,4 +840,4 @@ public class TaskUtil {
     TaskState jobState = workflowContext.getJobState(job);
     return (jobState != null && jobState != TaskState.NOT_STARTED);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
new file mode 100644
index 0000000..56221eb
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -0,0 +1,160 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.apache.helix.task.assigner.TaskAssignResult;
+import org.apache.helix.task.assigner.TaskAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ThreadCountBasedTaskAssignmentCalculator is an implementation of 
TaskAssignmentCalculator. It
+ * serves as a wrapper around ThreadCountBasedTaskAssigner so that it could be 
used in the existing
+ * Task Framework Controller pipeline (WorkflowRebalancer and JobRebalancer).
+ */
+public class ThreadCountBasedTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ThreadCountBasedTaskAssignmentCalculator.class);
+  private TaskAssigner _taskAssigner;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  /**
+   * Constructor for ThreadCountBasedTaskAssignmentCalculator. Requires an 
instance of
+   * ThreadCountBasedTaskAssigner and the up-to-date AssignableInstanceManager.
+   * @param taskAssigner
+   * @param assignableInstanceManager
+   */
+  public ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
+      AssignableInstanceManager assignableInstanceManager) {
+    _taskAssigner = taskAssigner;
+    _assignableInstanceManager = assignableInstanceManager;
+  }
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      int nextPartition = jobCtx.getPartitionSet().size();
+      if (!taskIdMap.containsKey(taskId)) {
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
+      ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+
+    if (jobCfg.getTargetResource() != null) {
+      LOG.error(
+          "Target resource is not null, should call 
FixedTaskAssignmentCalculator, target resource : {}",
+          jobCfg.getTargetResource());
+      return new HashMap<>();
+    }
+
+    // Get AssignableInstances
+    Iterable<AssignableInstance> assignableInstances =
+        _assignableInstanceManager.getAssignableInstanceMap().values();
+
+    // Convert the filtered partitionSet (partition numbers) to TaskConfigs
+    Iterable<TaskConfig> taskConfigs = getFilteredTaskConfigs(partitionSet, 
jobCfg, jobContext);
+
+    // Get the quota type to assign tasks to
+    String quotaType = jobCfg.getQuotaType();
+
+    // Assign tasks to AssignableInstances
+    Map<String, TaskAssignResult> taskAssignResultMap =
+        _taskAssigner.assignTasks(assignableInstances, taskConfigs, quotaType);
+
+    // TODO: Do this with Quota Manager is ready
+    // Cache TaskAssignResultMap to prevent double-assign
+    // This will be used in AbstractTaskDispatcher to release tasks that 
aren't actually
+    // scheduled/throttled
+    // 
_assignableInstanceManager.getTaskAssignResultMap().putAll(taskAssignResultMap);
+
+    // Get TaskId->PartitionNumber mappings for conversion
+    Map<String, Integer> taskIdPartitionMap = 
jobContext.getTaskIdPartitionMap();
+
+    // Instantiate the result map that maps instance to set of task 
(partition) mappings
+    Map<String, SortedSet<Integer>> taskAssignment = new HashMap<>();
+
+    // Loop through all TaskAssignResults and convert the result to the format 
compliant to
+    // TaskAssignmentCalculator's API
+    for (Map.Entry<String, TaskAssignResult> assignResultEntry : 
taskAssignResultMap.entrySet()) {
+      TaskAssignResult taskAssignResult = assignResultEntry.getValue();
+
+      if (taskAssignResult.isSuccessful()) {
+        String instanceName = taskAssignResult.getInstanceName();
+        String taskId = taskAssignResult.getTaskConfig().getId();
+        // Since return value contains SortedSet<Integer> which is a set of 
partition numbers, we
+        // must convert the taskID (given in TaskAssignResult) to its 
corresponding partition
+        // number using taskIdPartitionMap found in JobContext
+        if (!taskIdPartitionMap.containsKey(taskId)) {
+          LOG.warn(
+              "Task is not found in taskIdPartitionMap. Skipping this task! 
JobID: {}, TaskID: {}",
+              jobCfg.getJobId(), taskId);
+          continue;
+        }
+        int partitionNumberForTask = taskIdPartitionMap.get(taskId);
+        if (!taskAssignment.containsKey(instanceName)) {
+          taskAssignment.put(instanceName, new TreeSet<Integer>());
+        }
+        taskAssignment.get(instanceName).add(partitionNumberForTask);
+      }
+    }
+    return taskAssignment;
+  }
+
+  /**
+   * Returns TaskConfigs whose partition numbers (ids) are present in 
filteredPartitionNumbers. This
+   * means that these tasks should have the state of INIT, RUNNING, or null. 
This function basically
+   * converts partition numbers to corresponding TaskConfigs.
+   * @param jobContext
+   * @param filteredPartitionNumbers
+   * @return
+   */
+  private Iterable<TaskConfig> getFilteredTaskConfigs(Set<Integer> 
filteredPartitionNumbers,
+      JobConfig jobConfig, JobContext jobContext) {
+    Set<TaskConfig> filteredTaskConfigs = new HashSet<>();
+    for (int partitionNumber : filteredPartitionNumbers) {
+      String taskId = jobContext.getTaskIdForPartition(partitionNumber);
+      TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
+      filteredTaskConfigs.add(taskConfig);
+    }
+    return filteredTaskConfigs;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 32eed2c..32a5370 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -55,8 +55,7 @@ import com.google.common.collect.Lists;
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowRebalancer.class);
   private static final Set<TaskState> finalStates = new HashSet<>(
-      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, 
TaskState.TIMED_OUT)
-  );
+      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, 
TaskState.TIMED_OUT));
 
   @Override
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache 
clusterData,
@@ -78,7 +77,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the 
workflow context.");
-      cleanupWorkflow(workflow,  workflowCfg);
+      cleanupWorkflow(workflow, workflowCfg);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -89,8 +88,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
       // If timeout point has already been passed, it will not be scheduled
       scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), 
workflowCfg.getTimeout());
 
-      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && 
isTimeout(
-          workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
+          && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
         workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
         clusterData.updateWorkflowContext(workflow, workflowCtx, 
_manager.getHelixDataAccessor());
       }
@@ -104,8 +103,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     // Step 3: handle workflow that should STOP
     // For workflows that already reached final states, STOP should not take 
into effect
-    if (!finalStates.contains(workflowCtx.getWorkflowState()) && 
TargetState.STOP
-        .equals(targetState)) {
+    if (!finalStates.contains(workflowCtx.getWorkflowState())
+        && TargetState.STOP.equals(targetState)) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
       if (isWorkflowStopped(workflowCtx, workflowCfg)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
@@ -122,8 +121,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // monitor if provided
     // Note that COMPLETE and FAILED will be marked in markJobComplete / 
markJobFailed
     // This is to handle TIMED_OUT only
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
-        && isWorkflowFinished(workflowCtx, workflowCfg, 
clusterData.getJobConfigMap())) {
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && 
isWorkflowFinished(workflowCtx,
+        workflowCfg, clusterData.getJobConfigMap(), clusterData)) {
       workflowCtx.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowCtx, workflowCfg);
       clusterData.updateWorkflowContext(workflow, workflowCtx, 
_manager.getHelixDataAccessor());
@@ -148,17 +147,16 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (!isWorkflowReadyForSchedule(workflowCfg)) {
       LOG.info("Workflow " + workflow + " is not ready to schedule");
       // set the timer to trigger future schedule
-      _rebalanceScheduler
-          .scheduleRebalance(_manager, workflow, 
workflowCfg.getStartTime().getTime());
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
+          workflowCfg.getStartTime().getTime());
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
     // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx, 
clusterData);
+    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, 
workflowCtx, clusterData);
     if (isReady) {
       // Schedule jobs from this workflow.
-      scheduleJobs(workflow, workflowCfg, workflowCtx, 
clusterData.getJobConfigMap());
+      scheduleJobs(workflow, workflowCfg, workflowCtx, 
clusterData.getJobConfigMap(), clusterData);
     } else {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
@@ -172,7 +170,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
-  private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache 
clusterData, String workflowName) {
+  private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache 
clusterData,
+      String workflowName) {
     WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName);
     if (workflowCtx == null) {
       WorkflowConfig config = clusterData.getWorkflowConfig(workflowName);
@@ -189,7 +188,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * and if it's ready, then just schedule it
    */
   private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap) {
+      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
+      ClusterDataCache clusterDataCache) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
       LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -217,7 +217,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
       }
 
       // check ancestor job status
-      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap)) {
+      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap,
+          clusterDataCache)) {
         JobConfig jobConfig = jobConfigMap.get(job);
         if (jobConfig == null) {
           LOG.error(String.format("The job config is missing for job %s", 
job));
@@ -249,9 +250,9 @@ public class WorkflowRebalancer extends TaskRebalancer {
         }
       }
     }
-    long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflow) 
== -1
-        ? Long.MAX_VALUE
-        : _rebalanceScheduler.getRebalanceTime(workflow);
+    long currentScheduledTime =
+        _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
+            : _rebalanceScheduler.getRebalanceTime(workflow);
     if (timeToSchedule < currentScheduledTime) {
       _rebalanceScheduler.scheduleRebalance(_manager, workflow, 
timeToSchedule);
     }
@@ -280,7 +281,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
           admin.getResourceIdealState(_manager.getClusterName(), 
jobConfig.getTargetResource());
       if (targetIs == null) {
         LOG.warn("Target resource does not exist for job " + jobResource);
-        // do not need to fail here, the job will be marked as failure 
immediately when job starts running.
+        // do not need to fail here, the job will be marked as failure 
immediately when job starts
+        // running.
       } else {
         numPartitions = targetIs.getPartitionSet().size();
       }
@@ -329,10 +331,9 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
   /**
    * Check if a workflow is ready to schedule, and schedule a rebalance if it 
is not
-   *
    * @param workflow the Helix resource associated with the workflow
-   * @param workflowCfg  the workflow to check
-   * @param workflowCtx  the current workflow context
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
    * @return true if the workflow is ready for schedule, false if not ready
    */
   private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig 
workflowCfg,
@@ -377,7 +378,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
         long offsetMultiplier = (-delayFromStart) / period;
         long timeToSchedule = period * offsetMultiplier + startTime.getTime();
 
-
         // Now clone the workflow if this clone has not yet been created
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -394,8 +394,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
             driver.start(clonedWf);
           } catch (Exception e) {
             LOG.error("Failed to schedule cloned workflow " + newWorkflowName, 
e);
-            _clusterStatusMonitor
-                .updateWorkflowCounters(clonedWf.getWorkflowConfig(), 
TaskState.FAILED);
+            
_clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
+                TaskState.FAILED);
           }
           // Persist workflow start regardless of success to avoid retrying 
and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
@@ -422,11 +422,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
   /**
    * Create a new workflow based on an existing one
-   *
-   * @param manager          connection to Helix
+   * @param manager connection to Helix
    * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName  the name of the new workflow
-   * @param newStartTime     a provided start time that deviates from the 
desired start time
+   * @param newWorkflowName the name of the new workflow
+   * @param newStartTime a provided start time that deviates from the desired 
start time
    * @return the cloned workflow, or null if there was a problem cloning the 
existing one
    */
   public static Workflow cloneWorkflow(HelixManager manager, String 
origWorkflowName,
@@ -514,7 +513,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
       for (String job : jobs) {
         _rebalanceScheduler.removeScheduledRebalance(job);
       }
-      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(), workflow, jobs)) {
+      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
+          _manager.getHelixPropertyStore(), workflow, jobs)) {
         LOG.warn("Failed to clean up workflow " + workflow);
       }
     } else {
@@ -525,7 +525,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
   /**
    * Clean up all jobs that are COMPLETED and passes its expiry time.
-   *
    * @param workflowConfig
    * @param workflowContext
    */
@@ -537,26 +536,24 @@ public class WorkflowRebalancer extends TaskRebalancer {
     long currentTime = System.currentTimeMillis();
 
     if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
-      Set<String> expiredJobs = TaskUtil
-          .getExpiredJobs(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(),
-              workflowConfig, workflowContext);
+      Set<String> expiredJobs = 
TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(),
+          _manager.getHelixPropertyStore(), workflowConfig, workflowContext);
 
       if (expiredJobs.isEmpty()) {
         LOG.info("No job to purge for the queue " + workflow);
       } else {
         LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
         for (String job : expiredJobs) {
-          if (!TaskUtil
-              .removeJob(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(), job)) {
+          if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(),
+              job)) {
             LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
           }
           _rebalanceScheduler.removeScheduledRebalance(job);
         }
-        if (!TaskUtil
-            .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, 
expiredJobs, true)) {
-          LOG.warn(
-              "Error occurred while trying to remove jobs + " + expiredJobs + 
" from the workflow "
-                  + workflow);
+        if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), 
workflow, expiredJobs,
+            true)) {
+          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
+              + " from the workflow " + workflow);
         }
         // remove job states in workflowContext.
         workflowContext.removeJobStates(expiredJobs);
@@ -582,4 +579,4 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 4e51f80..b13bb61 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -198,7 +198,7 @@ public class AssignableInstance {
       LiveInstance liveInstance) {
     logger.info("Updating configs for AssignableInstance {}", 
_instanceConfig.getInstanceName());
     boolean refreshCapacity = false;
-    if (clusterConfig != null) {
+    if (clusterConfig != null && clusterConfig.getTaskQuotaRatioMap() != null) 
{
       if 
(!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap()))
 {
         refreshCapacity = true;
       }
@@ -212,7 +212,7 @@ public class AssignableInstance {
             "Cannot update live instance with different instance name. 
Current: {}; new: {}",
             _instanceConfig.getInstanceName(), liveInstance.getInstanceName());
       } else {
-        if 
(!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap()))
 {
+        if (liveInstance.getResourceCapacityMap() != null && 
!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap()))
 {
           refreshCapacity = true;
         }
         _liveInstance = liveInstance;
@@ -248,13 +248,17 @@ public class AssignableInstance {
    * @return TaskAssignResult
    * @throws IllegalArgumentException if task is null
    */
-  public TaskAssignResult tryAssign(TaskConfig task, String quotaType)
+  public synchronized TaskAssignResult tryAssign(TaskConfig task, String 
quotaType)
       throws IllegalArgumentException {
     if (task == null) {
       throw new IllegalArgumentException("Task is null!");
     }
 
     if (_currentAssignments.contains(task.getId())) {
+      logger.warn(
+          "Task: {} of quotaType: {} is already assigned to this instance. 
Instance name: {}",
+          task.getId(), quotaType, getInstanceName());
+
       return new TaskAssignResult(task, quotaType, this, false, 0,
           TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED,
           String.format("Task %s is already assigned to this instance. Need to 
release it first",
@@ -266,14 +270,27 @@ public class AssignableInstance {
 
     // Fail when no such resource type
     if (!_totalCapacity.containsKey(resourceType)) {
+
+      logger.warn(
+          "AssignableInstance does not support the given resourceType: {}. 
Task: {}, quotaType: {}, Instance name: {}",
+          resourceType, task.getId(), quotaType, getInstanceName());
+
       return new TaskAssignResult(task, quotaType, this, false, 0,
           TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE,
           String.format("Requested resource type %s not supported. Available 
resource types: %s",
               resourceType, _totalCapacity.keySet()));
     }
 
-    // Fail when no such quota type
+    // Fail when no such quota type. However, if quotaType is null, treat it 
as DEFAULT
+    if (quotaType == null || quotaType.equals("")) {
+      quotaType = DEFAULT_QUOTA_TYPE;
+    }
     if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
+
+      logger.warn(
+          "AssignableInstance does not support the given quotaType: {}. Task: 
{}, quotaType: {}, Instance name: {}",
+          quotaType, task.getId(), quotaType, getInstanceName());
+
       return new TaskAssignResult(task, quotaType, this, false, 0,
           TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE,
           String.format("Requested quota type %s not defined. Available quota 
types: %s", quotaType,
@@ -285,6 +302,11 @@ public class AssignableInstance {
 
     // Fail with insufficient quota
     if (capacity <= usage) {
+
+      logger.warn(
+          "AssignableInstance does not have enough capacity for quotaType: {}. 
Task: {}, quotaType: {}, Instance name: {}. Current capacity: {} capacity 
needed to schedule: {}",
+          quotaType, task.getId(), quotaType, getInstanceName(), capacity, 
usage);
+
       return new TaskAssignResult(task, quotaType, this, false, 0,
           TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA,
           String.format("Insufficient quota %s::%s. Capacity: %s, Current 
Usage: %s", resourceType,
@@ -305,7 +327,7 @@ public class AssignableInstance {
    * @throws IllegalStateException if TaskAssignResult is not successful or 
the task is double
    *           assigned, or the task is not assigned to this instance
    */
-  public void assign(TaskAssignResult result) throws IllegalStateException {
+  public synchronized void assign(TaskAssignResult result) throws 
IllegalStateException {
     if (!result.isSuccessful()) {
       throw new IllegalStateException("Cannot assign a failed result: " + 
result);
     }
@@ -349,15 +371,22 @@ public class AssignableInstance {
    * Performs the following to release resource for a task:
    * 1. Release the resource by adding back what the task required.
    * 2. Remove the TaskAssignResult from _currentAssignments
+   * Note that if the given quotaType is null, AssignableInstance will try to 
release from DEFAULT
+   * type.
    * @param taskConfig config of this task
    * @param quotaType quota type this task belongs to
    */
-  public void release(TaskConfig taskConfig, String quotaType) {
+  public synchronized boolean release(TaskConfig taskConfig, String quotaType) 
{
     if (!_currentAssignments.contains(taskConfig.getId())) {
       logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
           _instanceConfig.getInstanceName());
-      return;
+      return false;
     }
+    if (quotaType == null) {
+      logger.warn("Task {}'s quotaType is null. Trying to release as DEFAULT 
type.", taskConfig.getId());
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+
     String resourceType = 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
 
     // We might be releasing a task whose resource requirement / quota type is 
out-dated,
@@ -366,10 +395,12 @@ public class AssignableInstance {
         && _usedCapacity.get(resourceType).containsKey(quotaType)) {
       int curUsage = _usedCapacity.get(resourceType).get(quotaType);
       _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
+      _currentAssignments.remove(taskConfig.getId());
+      logger.info("Released task {} from instance {}", taskConfig.getId(),
+          _instanceConfig.getInstanceName());
+      return true;
     }
-    _currentAssignments.remove(taskConfig.getId());
-    logger.info("Released task {} from instance {}", taskConfig.getId(),
-        _instanceConfig.getInstanceName());
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
index fee54e5..5d749fb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -71,6 +71,12 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
       logger.warn("No instance to assign!");
       return buildNoInstanceAssignment(tasks, quotaType);
     }
+    if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) 
{
+      // Sometimes null is stored as a String literal
+      logger.warn("Quota type is null. Assigning it as DEFAULT type!");
+      quotaType = DEFAULT_QUOTA_TYPE;
+    }
+
     logger.info("Assigning tasks with quota type {}", quotaType);
 
     // Build a sched queue
@@ -87,6 +93,9 @@ public class ThreadCountBasedTaskAssigner implements 
TaskAssigner {
         continue;
       }
 
+      // TODO: Review this logic
+      // TODO: 1. It assumes that the only mode of failure is due to 
insufficient capacity. This assumption may not always be true. Verify
+      // TODO: 2. All TaskAssignResults will get 
failureReason/Description/TaskID for the first task that failed. This will need 
correction
       // Every time we try to assign the task to the least-used instance, if 
that fails,
       // we assume all subsequent tasks will fail with same reason
       if (lastFailure != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
index 3ad1062..9861518 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
@@ -18,7 +18,7 @@ public class TestBatchEnableInstances extends TaskTestBase {
     _numDbs = 1;
     _numReplicas = 3;
     _numNodes = 5;
-    _numParitions = 4;
+    _numPartitions = 4;
     super.beforeClass();
     _accessor = new ConfigAccessor(_gZkClient);
   }
@@ -31,7 +31,7 @@ public class TestBatchEnableInstances extends TaskTestBase {
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
-    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numPartitions);
     for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
       
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
     }
@@ -48,7 +48,7 @@ public class TestBatchEnableInstances extends TaskTestBase {
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
-    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numPartitions);
     for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
       
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
       
Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName()));
@@ -69,7 +69,7 @@ public class TestBatchEnableInstances extends TaskTestBase {
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
-    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numPartitions);
     int numOfFirstHost = 0;
     for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
       if (stateMap.keySet().contains(_participants[0].getInstanceName())) {
@@ -92,7 +92,7 @@ public class TestBatchEnableInstances extends TaskTestBase {
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
-    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numPartitions);
     int numOfFirstHost = 0;
     for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
       if (stateMap.keySet().contains(_participants[0].getInstanceName())) {
@@ -105,4 +105,4 @@ public class TestBatchEnableInstances extends TaskTestBase {
         Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
         true);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
index a2c87c0..8e669ac 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
@@ -59,7 +59,7 @@ public class TestStateTransitionCancellation extends 
TaskTestBase {
   public void beforeClass() throws Exception {
     _participants = new MockParticipantManager[_numNodes];
     _numDbs = 1;
-    _numParitions = 20;
+    _numPartitions = 20;
     _numNodes = 2;
     _numReplicas = 2;
     _verifier =
@@ -174,7 +174,7 @@ public class TestStateTransitionCancellation extends 
TaskTestBase {
 
     // Either partial of state transitions have been cancelled or all the 
Slave -> Master
     // reassigned to other cluster
-    Assert.assertTrue((numOfMasters > 0 && numOfMasters <= _numParitions));
+    Assert.assertTrue((numOfMasters > 0 && numOfMasters <= _numPartitions));
   }
 
   private void stateCleanUp() {
@@ -292,4 +292,4 @@ public class TestStateTransitionCancellation extends 
TaskTestBase {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
index e643c9a..daa3fc4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
@@ -19,7 +19,7 @@ public class TestClusterMaintenanceMode extends TaskTestBase {
     _numDbs = 1;
     _numNodes = 3;
     _numReplicas = 3;
-    _numParitions = 5;
+    _numPartitions = 5;
     super.beforeClass();
   }
 
@@ -89,4 +89,4 @@ public class TestClusterMaintenanceMode extends TaskTestBase {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
index 650456b..deca2ea 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
@@ -39,7 +39,7 @@ public class TestTargetExternalView extends TaskTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     _numDbs = 3;
-    _numParitions = 8;
+    _numPartitions = 8;
     _numNodes = 4;
     _numReplicas = 2;
     super.beforeClass();
@@ -92,4 +92,4 @@ public class TestTargetExternalView extends TaskTestBase {
           idealStates.get(i).getRecord().getListFields());
     }
   }
-}
+}
\ No newline at end of file

Reply via email to