http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 1892062..d394931 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -175,6 +175,9 @@ public class JobConfig extends ResourceConfig { public static final long DEFAULT_Job_EXECUTION_DELAY_TIME = -1L; public static final boolean DEFAULT_REBALANCE_RUNNING_TASK = false; + // Cache TaskConfig objects for targeted jobs' tasks to reduce object creation/GC overload + private Map<String, TaskConfig> _targetedTaskConfigMap = new HashMap<>(); + public JobConfig(HelixProperty property) { super(property.getRecord()); } @@ -263,7 +266,9 @@ public class JobConfig extends ResourceConfig { String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE)); getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(), rebalanceRunningTask); - putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType); + if (quotaType != null) { + putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType); + } } public String getWorkflow() { @@ -355,8 +360,16 @@ public class JobConfig extends ResourceConfig { DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE); } + /** + * Returns taskConfigMap. If it's targeted, then return a cached targetedTaskConfigMap. + * @return + */ public Map<String, TaskConfig> getTaskConfigMap() { - Map<String, TaskConfig> taskConfigMap = new HashMap<String, TaskConfig>(); + String targetResource = getSimpleConfig(JobConfigProperty.TargetResource.name()); + if (targetResource != null) { + return _targetedTaskConfigMap; + } + Map<String, TaskConfig> taskConfigMap = new HashMap<>(); for (Map.Entry<String, Map<String, String>> entry : getMapConfigs().entrySet()) { taskConfigMap.put(entry.getKey(), new TaskConfig(null, entry.getValue(), entry.getKey(), null)); @@ -364,10 +377,33 @@ public class JobConfig extends ResourceConfig { return taskConfigMap; } + /** + * If the job is targeted, try to get it from the cached targetedTaskConfigMap first. If not, + * create a TaskConfig on the fly. + * @param id pName for targeted tasks + * @return a TaskConfig object + */ public TaskConfig getTaskConfig(String id) { + String targetResource = getSimpleConfig(JobConfigProperty.TargetResource.name()); + if (targetResource != null) { + // This is a targeted task. For targeted tasks, id is pName + if (!_targetedTaskConfigMap.containsKey(id)) { + return new TaskConfig(null, null, id, null); + } + return _targetedTaskConfigMap.get(id); + } return new TaskConfig(null, getMapConfig(id), id, null); } + /** + * When a targeted task is assigned for the first time, cache it in JobConfig so that it could be + * retrieved later for release. + * @param pName a concatenation of job name + "_" + task partition number + */ + public void setTaskConfig(String pName, TaskConfig taskConfig) { + _targetedTaskConfigMap.put(pName, taskConfig); + } + public Map<String, String> getResourceConfigMap() { return getSimpleConfigs(); }
http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 11abb25..c49a365 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -40,6 +40,7 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,17 +52,14 @@ import com.google.common.collect.ImmutableMap; */ public class JobRebalancer extends TaskRebalancer { private static final Logger LOG = LoggerFactory.getLogger(JobRebalancer.class); - private static TaskAssignmentCalculator _fixTaskAssignmentCal = - new FixedTargetTaskAssignmentCalculator(); - private static TaskAssignmentCalculator _genericTaskAssignmentCal = - new GenericTaskAssignmentCalculator(); + private static TaskAssignmentCalculator _fixTaskAssignmentCal; + private static TaskAssignmentCalculator _threadCountBasedTaskAssignmentCal; private static final String PREV_RA_NODE = "PreviousResourceAssignment"; @Override - public ResourceAssignment computeBestPossiblePartitionState( - ClusterDataCache clusterData, IdealState taskIs, Resource resource, - CurrentStateOutput currStateOutput) { + public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { final String jobName = resource.getResourceName(); LOG.debug("Computer Best Partition for job: " + jobName); @@ -93,7 +91,8 @@ public class JobRebalancer extends TaskRebalancer { return buildEmptyAssignment(jobName, currStateOutput); } - // Stop current run of the job if workflow or job is already in final state (failed or completed) + // Stop current run of the job if workflow or job is already in final state (failed or + // completed) TaskState workflowState = workflowCtx.getWorkflowState(); TaskState jobState = workflowCtx.getJobState(jobName); // The job is already in a final state (completed/failed). @@ -114,7 +113,7 @@ public class JobRebalancer extends TaskRebalancer { if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx), - clusterData.getJobConfigMap())) { + clusterData.getJobConfigMap(), clusterData)) { LOG.info("Job is not ready to run " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } @@ -142,9 +141,9 @@ public class JobRebalancer extends TaskRebalancer { // is stored in zk. // Fetch the previous resource assignment from the property store. This is required because of // HELIX-230. - Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null - ? clusterData.getEnabledLiveInstances() - : clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); + Set<String> liveInstances = + jobCfg.getInstanceGroupTag() == null ? clusterData.getEnabledLiveInstances() + : clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); if (liveInstances.isEmpty()) { LOG.error("No available instance found for job!"); @@ -197,13 +196,13 @@ public class JobRebalancer extends TaskRebalancer { // Update Workflow and Job context in data cache and ZK. clusterData.updateJobContext(jobName, jobCtx, _manager.getHelixDataAccessor()); - clusterData - .updateWorkflowContext(workflowResource, workflowCtx, _manager.getHelixDataAccessor()); + clusterData.updateWorkflowContext(workflowResource, workflowCtx, + _manager.getHelixDataAccessor()); setPrevResourceAssignment(jobName, newAssignment); - LOG.debug("Job " + jobName + " new assignment " + Arrays - .toString(newAssignment.getMappedPartitions().toArray())); + LOG.debug("Job " + jobName + " new assignment " + + Arrays.toString(newAssignment.getMappedPartitions().toArray())); return newAssignment; } @@ -213,7 +212,6 @@ public class JobRebalancer extends TaskRebalancer { CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) { - // Used to keep track of tasks that have already been assigned to instances. Set<Integer> assignedPartitions = new HashSet<>(); @@ -223,12 +221,13 @@ public class JobRebalancer extends TaskRebalancer { // Keeps a mapping of (partition) -> (instance, state) Map<Integer, PartitionAssignment> paMap = new TreeMap<>(); - Set<String> excludedInstances = getExcludedInstances(jobResource, workflowConfig, cache); + Set<String> excludedInstances = + getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache); // Process all the current assignments of tasks. - TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); - Set<Integer> allPartitions = taskAssignmentCal - .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates()); + TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache); + Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, + workflowConfig, workflowCtx, cache.getIdealStates()); if (allPartitions == null || allPartitions.isEmpty()) { // Empty target partitions, mark the job as FAILED. @@ -236,7 +235,7 @@ public class JobRebalancer extends TaskRebalancer { "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!"; LOG.info(failureMsg); jobCtx.setInfo(failureMsg); - failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); + failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache); markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false); return new ResourceAssignment(jobResource); } @@ -247,23 +246,23 @@ public class JobRebalancer extends TaskRebalancer { long currentTime = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { - LOG.debug( - "All partitions: " + allPartitions + " taskAssignment: " + prevInstanceToTaskAssignments - + " excludedInstances: " + excludedInstances); + LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances); } + // Release resource for tasks in terminal state updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource, currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState, - assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions); + assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache); addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg); if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold() || (jobCfg.getTargetResource() != null - && cache.getIdealState(jobCfg.getTargetResource()) != null && !cache - .getIdealState(jobCfg.getTargetResource()).isEnabled())) { + && cache.getIdealState(jobCfg.getTargetResource()) != null + && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) { if (isJobFinished(jobCtx, jobResource, currStateOutput)) { - failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); + failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache); return buildEmptyAssignment(jobResource, currStateOutput); } workflowCtx.setJobState(jobResource, TaskState.FAILING); @@ -286,12 +285,13 @@ public class JobRebalancer extends TaskRebalancer { } if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) { - failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); + failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache); return buildEmptyAssignment(jobResource, currStateOutput); } if (isJobComplete(jobCtx, allPartitions, jobCfg)) { - markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap()); + markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(), + cache); _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED, jobCtx.getFinishTime() - jobCtx.getStartTime()); _rebalanceScheduler.removeScheduledRebalance(jobResource); @@ -299,7 +299,8 @@ public class JobRebalancer extends TaskRebalancer { return buildEmptyAssignment(jobResource, currStateOutput); } - // If job is being timed out and no task is running (for whatever reason), idealState can be deleted and all tasks + // If job is being timed out and no task is running (for whatever reason), idealState can be + // deleted and all tasks // can be dropped(note that Helix doesn't track whether the drop is success or not). if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) { handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg); @@ -341,8 +342,8 @@ public class JobRebalancer extends TaskRebalancer { String instance = jobContext.getAssignedParticipant(pId); Message pendingMessage = currentStateOutput.getPendingState(jobResource, partition, instance); // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished - if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT - && pendingMessage != null)) { + if (state == TaskPartitionState.RUNNING + || (state == TaskPartitionState.INIT && pendingMessage != null)) { return false; } } @@ -351,28 +352,25 @@ public class JobRebalancer extends TaskRebalancer { /** * Get the last task assignment for a given job - * * @param resourceName the name of the job - * * @return {@link ResourceAssignment} instance, or null if no assignment is available */ private ResourceAssignment getPrevResourceAssignment(String resourceName) { - ZNRecord r = _manager.getHelixPropertyStore() - .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - null, AccessOption.PERSISTENT); + ZNRecord r = _manager.getHelixPropertyStore().get( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), + null, AccessOption.PERSISTENT); return r != null ? new ResourceAssignment(r) : null; } /** * Set the last task assignment for a given job - * * @param resourceName the name of the job - * @param ra {@link ResourceAssignment} containing the task assignment + * @param ra {@link ResourceAssignment} containing the task assignment */ private void setPrevResourceAssignment(String resourceName, ResourceAssignment ra) { - _manager.getHelixPropertyStore() - .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - ra.getRecord(), AccessOption.PERSISTENT); + _manager.getHelixPropertyStore().set( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), + ra.getRecord(), AccessOption.PERSISTENT); } /** @@ -389,7 +387,8 @@ public class JobRebalancer extends TaskRebalancer { if (!isTaskGivenup(ctx, cfg, pId)) { return false; } - // If the task is given up, there's still chance the job has completed because of job failure threshold. + // If the task is given up, there's still chance the job has completed because of job + // failure threshold. numOfGivenUpTasks++; } } @@ -398,9 +397,8 @@ public class JobRebalancer extends TaskRebalancer { /** * @param liveInstances - * @param prevAssignment task partition -> (instance -> state) + * @param prevAssignment task partition -> (instance -> state) * @param allTaskPartitions all task partitionIds - * * @return instance -> partitionIds from previous assignment, if the instance is still live */ private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments( @@ -426,7 +424,24 @@ public class JobRebalancer extends TaskRebalancer { return result; } - private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { - return TaskUtil.isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal; + /** + * If the job is a targeted job, use fixedTaskAssignmentCalculator. Otherwise, use + * threadCountBasedTaskAssignmentCalculator. Both calculators support quota-based scheduling. + * @param jobConfig + * @param cache + * @return + */ + private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig, + ClusterDataCache cache) { + AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager(); + if (_threadCountBasedTaskAssignmentCal == null) { + _threadCountBasedTaskAssignmentCal = new ThreadCountBasedTaskAssignmentCalculator( + new ThreadCountBasedTaskAssigner(), assignableInstanceManager); + } + if (_fixTaskAssignmentCal == null) { + _fixTaskAssignmentCal = new FixedTargetTaskAssignmentCalculator(assignableInstanceManager); + } + return TaskUtil.isGenericTaskJob(jobConfig) ? _threadCountBasedTaskAssignmentCal + : _fixTaskAssignmentCal; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java index 66b961b..8286257 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java @@ -11,7 +11,7 @@ import java.util.SortedSet; public abstract class TaskAssignmentCalculator { /** - * Get all the partitions that should be created by this task + * Get all the partitions/tasks that belong to this job. * * @param jobCfg the task configuration * @param jobCtx the task context @@ -43,4 +43,4 @@ public abstract class TaskAssignmentCalculator { Collection<String> instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, Map<String, IdealState> idealStateMap); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 1d29368..dae0da6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -52,14 +52,12 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher _manager = manager; } - @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput); /** * Checks if the workflow has been stopped. - * * @param ctx Workflow context containing task states * @param cfg Workflow config containing set of tasks * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise. @@ -67,8 +65,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) { for (String job : cfg.getJobDag().getAllNodes()) { TaskState jobState = ctx.getJobState(job); - if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState - .equals(TaskState.STOPPING))) { + if (jobState != null + && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) { return false; } } @@ -92,14 +90,14 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher /** * Check all the dependencies of a job to determine whether the job is ready to be scheduled. - * * @param job * @param workflowCfg * @param workflowCtx * @return */ protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, - WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap) { + WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap, + ClusterDataCache clusterDataCache) { int notStartedCount = 0; int failedOrTimeoutCount = 0; int incompleteParentCount = 0; @@ -118,8 +116,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher // If there is any parent job not started, this job should not be scheduled if (notStartedCount > 0) { if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount)); + LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job, + notStartedCount)); } return false; } @@ -132,10 +130,10 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher return false; } if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) { - markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap); + markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache); if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount)); + LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job, + failedOrTimeoutCount)); } return false; } @@ -166,7 +164,6 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher /** * Check if a workflow is ready to schedule. - * * @param workflowCfg the workflow to check * @return true if the workflow is ready for schedule, false if not ready */ @@ -177,12 +174,10 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher } @Override - public IdealState computeNewIdealState(String resourceName, - IdealState currentIdealState, CurrentStateOutput currentStateOutput, - ClusterDataCache clusterData) { + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { // All of the heavy lifting is in the ResourceAssignment computation, // so this part can just be a no-op. return currentIdealState; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index b3a7f29..68a9d4a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -59,11 +59,10 @@ public class TaskUtil { /** * Parses job resource configurations in Helix into a {@link JobConfig} object. * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig(); - * - * @param accessor Accessor to access Helix configs + * @param accessor Accessor to access Helix configs * @param job The name of the job resource * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null - * otherwise. + * otherwise. */ protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) { HelixProperty jobResourceConfig = getResourceConfig(accessor, job); @@ -76,11 +75,10 @@ public class TaskUtil { /** * Parses job resource configurations in Helix into a {@link JobConfig} object. * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig(); - * - * @param manager HelixManager object used to connect to Helix. + * @param manager HelixManager object used to connect to Helix. * @param job The name of the job resource. * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null - * otherwise. + * otherwise. */ protected static JobConfig getJobConfig(HelixManager manager, String job) { return getJobConfig(manager.getHelixDataAccessor(), job); @@ -88,11 +86,9 @@ public class TaskUtil { /** * Set the job config - * - * @param accessor Accessor to Helix configs - * @param job The job name + * @param accessor Accessor to Helix configs + * @param job The job name * @param jobConfig The job config to be set - * * @return True if set successfully, otherwise false */ protected static boolean setJobConfig(HelixDataAccessor accessor, String job, @@ -102,10 +98,8 @@ public class TaskUtil { /** * Remove a job config. - * * @param accessor * @param job - * * @return */ protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) { @@ -114,12 +108,12 @@ public class TaskUtil { /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. - * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowConfig(); - * - * @param accessor Accessor to access Helix configs + * This method is internal API, please use the corresponding one in + * TaskDriver.getWorkflowConfig(); + * @param accessor Accessor to access Helix configs * @param workflow The name of the workflow. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the - * workflow, null otherwise. + * workflow, null otherwise. */ protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) { HelixProperty workflowCfg = getResourceConfig(accessor, workflow); @@ -132,12 +126,12 @@ public class TaskUtil { /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. - * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowConfig(); - * - * @param manager Helix manager object used to connect to Helix. + * This method is internal API, please use the corresponding one in + * TaskDriver.getWorkflowConfig(); + * @param manager Helix manager object used to connect to Helix. * @param workflow The name of the workflow resource. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the - * workflow, null otherwise. + * workflow, null otherwise. */ protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) { return getWorkflowConfig(manager.getHelixDataAccessor(), workflow); @@ -145,10 +139,10 @@ public class TaskUtil { /** * Set the workflow config - * @param accessor Accessor to Helix configs - * @param workflow The workflow name - * @param workflowConfig The workflow config to be set - * @return True if set successfully, otherwise false + * @param accessor Accessor to Helix configs + * @param workflow The workflow name + * @param workflowConfig The workflow config to be set + * @return True if set successfully, otherwise false */ protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow, WorkflowConfig workflowConfig) { @@ -167,9 +161,8 @@ public class TaskUtil { /** * Get a Helix configuration scope at a resource (i.e. job and workflow) level - * * @param clusterName the cluster containing the resource - * @param resource the resource name + * @param resource the resource name * @return instantiated {@link HelixConfigScope} */ protected static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { @@ -180,24 +173,22 @@ public class TaskUtil { /** * Get the runtime context of a single job. * This method is internal API, please use TaskDriver.getJobContext(); - * * @param propertyStore Property store for the cluster - * @param jobResource The name of the job + * @param jobResource The name of the job * @return the {@link JobContext}, or null if none is available */ protected static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore, String jobResource) { - ZNRecord r = propertyStore - .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), - null, AccessOption.PERSISTENT); + ZNRecord r = propertyStore.get( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), null, + AccessOption.PERSISTENT); return r != null ? new JobContext(r) : null; } /** * Get the runtime context of a single job. * This method is internal API, please use TaskDriver.getJobContext(); - * - * @param manager a connection to Helix + * @param manager a connection to Helix * @param jobResource the name of the job * @return the {@link JobContext}, or null if none is available */ @@ -208,24 +199,22 @@ public class TaskUtil { /** * Set the runtime context of a single job * This method is internal API; - * - * @param manager a connection to Helix + * @param manager a connection to Helix * @param jobResource the name of the job - * @param ctx the up-to-date {@link JobContext} for the job + * @param ctx the up-to-date {@link JobContext} for the job */ protected static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { - manager.getHelixPropertyStore() - .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), - ctx.getRecord(), AccessOption.PERSISTENT); + manager.getHelixPropertyStore().set( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), + ctx.getRecord(), AccessOption.PERSISTENT); } /** * Remove the runtime context of a single job. * This method is internal API. - * - * @param manager A connection to Helix + * @param manager A connection to Helix * @param jobResource The name of the job - * @return True if remove success, otherwise false + * @return True if remove success, otherwise false */ protected static boolean removeJobContext(HelixManager manager, String jobResource) { return removeJobContext(manager.getHelixPropertyStore(), jobResource); @@ -234,10 +223,9 @@ public class TaskUtil { /** * Remove the runtime context of a single job. * This method is internal API. - * * @param propertyStore Property store for the cluster - * @param job The name of the job - * @return True if remove success, otherwise false + * @param job The name of the job + * @return True if remove success, otherwise false */ protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore, String job) { @@ -246,25 +234,25 @@ public class TaskUtil { /** * Get the runtime context of a single workflow. - * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext(); - * - * @param propertyStore Property store of the cluster + * This method is internal API, please use the corresponding one in + * TaskDriver.getWorkflowContext(); + * @param propertyStore Property store of the cluster * @param workflow The name of the workflow * @return the {@link WorkflowContext}, or null if none is available */ protected static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore, String workflow) { ZNRecord r = propertyStore.get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE), - null, AccessOption.PERSISTENT); + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE), null, + AccessOption.PERSISTENT); return r != null ? new WorkflowContext(r) : null; } /** * Get the runtime context of a single workflow. - * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext(); - * - * @param manager a connection to Helix + * This method is internal API, please use the corresponding one in + * TaskDriver.getWorkflowContext(); + * @param manager a connection to Helix * @param workflow the name of the workflow * @return the {@link WorkflowContext}, or null if none is available */ @@ -274,10 +262,9 @@ public class TaskUtil { /** * Set the runtime context of a single workflow - * - * @param manager a connection to Helix + * @param manager a connection to Helix * @param workflow the name of the workflow - * @param workflowContext the up-to-date {@link WorkflowContext} for the workflow + * @param workflowContext the up-to-date {@link WorkflowContext} for the workflow */ protected static void setWorkflowContext(HelixManager manager, String workflow, WorkflowContext workflowContext) { @@ -289,10 +276,9 @@ public class TaskUtil { /** * Remove the runtime context of a single workflow. * This method is internal API. - * - * @param manager A connection to Helix + * @param manager A connection to Helix * @param workflow The name of the workflow - * @return True if remove success, otherwise false + * @return True if remove success, otherwise false */ protected static boolean removeWorkflowContext(HelixManager manager, String workflow) { return removeWorkflowContext(manager.getHelixPropertyStore(), workflow); @@ -301,10 +287,9 @@ public class TaskUtil { /** * Remove the runtime context of a single workflow. * This method is internal API. - * - * @param propertyStore Property store for the cluster - * @param workflow The name of the workflow - * @return True if remove success, otherwise false + * @param propertyStore Property store for the cluster + * @param workflow The name of the workflow + * @return True if remove success, otherwise false */ protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore, String workflow) { @@ -313,49 +298,46 @@ public class TaskUtil { /** * Intialize the user content store znode setup - * @param propertyStore zookeeper property store + * @param propertyStore zookeeper property store * @param workflowJobResource the name of workflow or job - * @param record the initial data + * @param record the initial data */ - protected static void createUserContent(HelixPropertyStore propertyStore, String workflowJobResource, - ZNRecord record) { - propertyStore.create(Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, - TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT); + protected static void createUserContent(HelixPropertyStore propertyStore, + String workflowJobResource, ZNRecord record) { + propertyStore.create(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, + workflowJobResource, TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT); } /** * Get user defined workflow or job level key-value pair data - * - * @param manager a connection to Helix + * @param manager a connection to Helix * @param workflowJobResource the name of workflow - * @param key the key of key-value pair - * + * @param key the key of key-value pair * @return null if there is no such pair, otherwise return a String */ protected static String getWorkflowJobUserContent(HelixManager manager, String workflowJobResource, String key) { ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), - null, AccessOption.PERSISTENT); + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null, + AccessOption.PERSISTENT); return r != null ? r.getSimpleField(key) : null; } /** * Add an user defined key-value pair data to workflow or job level - * - * @param manager a connection to Helix + * @param manager a connection to Helix * @param workflowJobResource the name of workflow or job - * @param key the key of key-value pair - * @param value the value of key-value pair + * @param key the key of key-value pair + * @param value the value of key-value pair */ protected static void addWorkflowJobUserContent(final HelixManager manager, String workflowJobResource, final String key, final String value) { - String path = Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE); + String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, + USER_CONTENT_NODE); manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord znRecord) { + @Override + public ZNRecord update(ZNRecord znRecord) { znRecord.setSimpleField(key, value); return znRecord; } @@ -364,32 +346,27 @@ public class TaskUtil { /** * Get user defined task level key-value pair data - * - * @param manager a connection to Helix - * @param job the name of job + * @param manager a connection to Helix + * @param job the name of job * @param task the name of the task - * @param key the key of key-value pair - * + * @param key the key of key-value pair * @return null if there is no such pair, otherwise return a String */ - protected static String getTaskUserContent(HelixManager manager, String job, - String task, String key) { + protected static String getTaskUserContent(HelixManager manager, String job, String task, + String key) { ZNRecord r = manager.getHelixPropertyStore().get( Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); - return r != null ? (r.getMapField(task) != null - ? r.getMapField(task).get(key) - : null) : null; + return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null; } /** * Add an user defined key-value pair data to task level - * - * @param manager a connection to Helix - * @param job the name of job - * @param task the name of task - * @param key the key of key-value pair - * @param value the value of key-value pair + * @param manager a connection to Helix + * @param job the name of job + * @param task the name of task + * @param key the key of key-value pair + * @param value the value of key-value pair */ protected static void addTaskUserContent(final HelixManager manager, String job, final String task, final String key, final String value) { @@ -397,7 +374,8 @@ public class TaskUtil { Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE); manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord znRecord) { + @Override + public ZNRecord update(ZNRecord znRecord) { if (znRecord.getMapField(task) == null) { znRecord.setMapField(task, new HashMap<String, String>()); } @@ -406,9 +384,9 @@ public class TaskUtil { } }, AccessOption.PERSISTENT); } + /** * Get a workflow-qualified job name for a single-job workflow - * * @param singleJobWorkflow the name of the single-job workflow * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow */ @@ -418,9 +396,8 @@ public class TaskUtil { /** * Get a workflow-qualified job name for a job in that workflow - * * @param workflow the name of the workflow - * @param jobName the un-namespaced name of the job + * @param jobName the un-namespaced name of the job * @return The namespaced job name, which is just workflowResource_jobName */ public static String getNamespacedJobName(String workflow, String jobName) { @@ -429,9 +406,8 @@ public class TaskUtil { /** * Remove the workflow namespace from the job name - * * @param workflow the name of the workflow that owns the job - * @param jobName the namespaced job name + * @param jobName the namespaced job name * @return the denamespaced job name, or the same job name if it is already denamespaced */ public static String getDenamespacedJobName(String workflow, String jobName) { @@ -445,7 +421,6 @@ public class TaskUtil { /** * Serialize a map of job-level configurations as a single string - * * @param commandConfig map of job config key to config value * @return serialized string */ @@ -464,7 +439,6 @@ public class TaskUtil { /** * Deserialize a single string into a map of job-level configurations - * * @param commandConfig the serialized job config map * @return a map of job config key to config value */ @@ -485,7 +459,6 @@ public class TaskUtil { /** * Extracts the partition id from the given partition name. - * * @param pName * @return */ @@ -504,27 +477,27 @@ public class TaskUtil { } @Deprecated - public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, String workflow) { + public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, + String workflow) { return accessor.keyBuilder().resourceConfig(workflow); } /** * Cleans up IdealState and external view associated with a job. - * * @param accessor * @param job - * @return True if remove success, otherwise false + * @return True if remove success, otherwise false */ - protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) { + protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, + String job) { return cleanupIdealStateExtView(accessor, job); } /** * Cleans up IdealState and external view associated with a workflow. - * * @param accessor * @param workflow - * @return True if remove success, otherwise false + * @return True if remove success, otherwise false */ protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor, String workflow) { @@ -565,56 +538,49 @@ public class TaskUtil { * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate, * externalview and workflow contexts associated with this workflow, and all jobs information, * including their configs, context, IS and EV. - * * @param accessor * @param propertyStore * @param workflow the workflow name. - * @param jobs all job names in this workflow. - * - * @return True if remove success, otherwise false + * @param jobs all job names in this workflow. + * @return True if remove success, otherwise false */ - protected static boolean removeWorkflow(final HelixDataAccessor accessor, final HelixPropertyStore propertyStore, - String workflow, Set<String> jobs) { - boolean success = true; - + protected static boolean removeWorkflow(final HelixDataAccessor accessor, + final HelixPropertyStore propertyStore, String workflow, Set<String> jobs) { // clean up all jobs for (String job : jobs) { if (!removeJob(accessor, propertyStore, job)) { - success = false; + return false; } } if (!removeWorkflowConfig(accessor, workflow)) { LOG.warn( String.format("Error occurred while trying to remove workflow config for %s.", workflow)); - success = false; + return false; } if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) { - LOG.warn(String - .format("Error occurred while trying to remove workflow idealstate/externalview for %s.", - workflow)); - success = false; + LOG.warn(String.format( + "Error occurred while trying to remove workflow idealstate/externalview for %s.", + workflow)); + return false; } if (!removeWorkflowContext(propertyStore, workflow)) { - LOG.warn(String - .format("Error occurred while trying to remove workflow context for %s.", workflow)); - success = false; + LOG.warn(String.format("Error occurred while trying to remove workflow context for %s.", + workflow)); + return false; } - - return success; + return true; } /** * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from * WorkflowContext. - * * @param dataAccessor * @param propertyStore * @param jobs * @param workflow * @param maintainDependency - * * @return True if remove success, otherwise false */ protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor, @@ -627,9 +593,8 @@ public class TaskUtil { success = false; } if (!removeJobsState(propertyStore, workflow, jobs)) { - LOG.warn( - "Error occurred while trying to remove jobs states from workflow + " + workflow + " jobs " - + jobs); + LOG.warn("Error occurred while trying to remove jobs states from workflow + " + workflow + + " jobs " + jobs); success = false; } for (String job : jobs) { @@ -643,12 +608,10 @@ public class TaskUtil { /** * Return all jobs that are COMPLETED and passes its expiry time. - * * @param dataAccessor * @param propertyStore * @param workflowConfig * @param workflowContext - * * @return */ protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor, @@ -685,8 +648,8 @@ public class TaskUtil { success = false; } if (!cleanupJobIdealStateExtView(accessor, job)) { - LOG.warn(String - .format("Error occurred while trying to remove job idealstate/externalview for %s.", job)); + LOG.warn(String.format( + "Error occurred while trying to remove job idealstate/externalview for %s.", job)); success = false; } if (!removeJobContext(propertyStore, job)) { @@ -699,8 +662,8 @@ public class TaskUtil { /** Remove the job name from the DAG from the queue configuration */ // Job name should be namespaced job name here. - protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow, - final Set<String> jobsToRemove, final boolean maintainDependency) { + protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, + final String workflow, final Set<String> jobsToRemove, final boolean maintainDependency) { // Now atomically clear the DAG DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { @Override @@ -716,8 +679,8 @@ public class TaskUtil { jobDag.removeNode(job, maintainDependency); } try { - currentData - .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson()); + currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), + jobDag.toJson()); } catch (IOException e) { throw new IllegalArgumentException(e); } @@ -749,7 +712,8 @@ public class TaskUtil { } DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { + @Override + public ZNRecord update(ZNRecord currentData) { if (currentData != null) { WorkflowContext workflowContext = new WorkflowContext(currentData); workflowContext.removeJobStates(jobs); @@ -782,7 +746,6 @@ public class TaskUtil { /** * Remove workflow or job config. - * * @param accessor * @param workflowJobResource the workflow or job name */ @@ -803,10 +766,10 @@ public class TaskUtil { /** * Set the resource config - * @param accessor Accessor to Helix configs - * @param resource The resource name - * @param resourceConfig The resource config to be set - * @return True if set successfully, otherwise false + * @param accessor Accessor to Helix configs + * @param resource The resource name + * @param resourceConfig The resource config to be set + * @return True if set successfully, otherwise false */ private static boolean setResourceConfig(HelixDataAccessor accessor, String resource, ResourceConfig resourceConfig) { @@ -830,16 +793,19 @@ public class TaskUtil { return nonReadyPartitions; } + /** + * Returns whether if a given job is a generic job (not a targeted job). + * @param jobConfig + * @return + */ public static boolean isGenericTaskJob(JobConfig jobConfig) { - Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); - return taskConfigMap != null && !taskConfigMap.isEmpty(); + // Targeted jobs may have TaskConfigs, so we check whether the target resource is set + return jobConfig.getTargetResource() == null || jobConfig.getTargetResource().equals(""); } /** * Check whether tasks are just started or still running - * * @param jobContext The job context - * * @return False if still tasks not in final state. Otherwise return true */ public static boolean checkJobStopped(JobContext jobContext) { @@ -852,10 +818,8 @@ public class TaskUtil { return true; } - /** * Count the number of jobs in a workflow that are not in final state. - * * @param workflowCfg * @param workflowCtx * @return @@ -876,4 +840,4 @@ public class TaskUtil { TaskState jobState = workflowContext.getJobState(job); return (jobState != null && jobState != TaskState.NOT_STARTED); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java new file mode 100644 index 0000000..56221eb --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java @@ -0,0 +1,160 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.task.assigner.AssignableInstance; +import org.apache.helix.task.assigner.TaskAssignResult; +import org.apache.helix.task.assigner.TaskAssigner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ThreadCountBasedTaskAssignmentCalculator is an implementation of TaskAssignmentCalculator. It + * serves as a wrapper around ThreadCountBasedTaskAssigner so that it could be used in the existing + * Task Framework Controller pipeline (WorkflowRebalancer and JobRebalancer). + */ +public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalculator { + private static final Logger LOG = + LoggerFactory.getLogger(ThreadCountBasedTaskAssignmentCalculator.class); + private TaskAssigner _taskAssigner; + private AssignableInstanceManager _assignableInstanceManager; + + /** + * Constructor for ThreadCountBasedTaskAssignmentCalculator. Requires an instance of + * ThreadCountBasedTaskAssigner and the up-to-date AssignableInstanceManager. + * @param taskAssigner + * @param assignableInstanceManager + */ + public ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner, + AssignableInstanceManager assignableInstanceManager) { + _taskAssigner = taskAssigner; + _assignableInstanceManager = assignableInstanceManager; + } + + @Override + public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Map<String, IdealState> idealStateMap) { + Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap(); + Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap(); + for (TaskConfig taskCfg : taskMap.values()) { + String taskId = taskCfg.getId(); + int nextPartition = jobCtx.getPartitionSet().size(); + if (!taskIdMap.containsKey(taskId)) { + jobCtx.setTaskIdForPartition(nextPartition, taskId); + } + } + return jobCtx.getPartitionSet(); + } + + @Override + public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, + ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, + JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) { + + if (jobCfg.getTargetResource() != null) { + LOG.error( + "Target resource is not null, should call FixedTaskAssignmentCalculator, target resource : {}", + jobCfg.getTargetResource()); + return new HashMap<>(); + } + + // Get AssignableInstances + Iterable<AssignableInstance> assignableInstances = + _assignableInstanceManager.getAssignableInstanceMap().values(); + + // Convert the filtered partitionSet (partition numbers) to TaskConfigs + Iterable<TaskConfig> taskConfigs = getFilteredTaskConfigs(partitionSet, jobCfg, jobContext); + + // Get the quota type to assign tasks to + String quotaType = jobCfg.getQuotaType(); + + // Assign tasks to AssignableInstances + Map<String, TaskAssignResult> taskAssignResultMap = + _taskAssigner.assignTasks(assignableInstances, taskConfigs, quotaType); + + // TODO: Do this with Quota Manager is ready + // Cache TaskAssignResultMap to prevent double-assign + // This will be used in AbstractTaskDispatcher to release tasks that aren't actually + // scheduled/throttled + // _assignableInstanceManager.getTaskAssignResultMap().putAll(taskAssignResultMap); + + // Get TaskId->PartitionNumber mappings for conversion + Map<String, Integer> taskIdPartitionMap = jobContext.getTaskIdPartitionMap(); + + // Instantiate the result map that maps instance to set of task (partition) mappings + Map<String, SortedSet<Integer>> taskAssignment = new HashMap<>(); + + // Loop through all TaskAssignResults and convert the result to the format compliant to + // TaskAssignmentCalculator's API + for (Map.Entry<String, TaskAssignResult> assignResultEntry : taskAssignResultMap.entrySet()) { + TaskAssignResult taskAssignResult = assignResultEntry.getValue(); + + if (taskAssignResult.isSuccessful()) { + String instanceName = taskAssignResult.getInstanceName(); + String taskId = taskAssignResult.getTaskConfig().getId(); + // Since return value contains SortedSet<Integer> which is a set of partition numbers, we + // must convert the taskID (given in TaskAssignResult) to its corresponding partition + // number using taskIdPartitionMap found in JobContext + if (!taskIdPartitionMap.containsKey(taskId)) { + LOG.warn( + "Task is not found in taskIdPartitionMap. Skipping this task! JobID: {}, TaskID: {}", + jobCfg.getJobId(), taskId); + continue; + } + int partitionNumberForTask = taskIdPartitionMap.get(taskId); + if (!taskAssignment.containsKey(instanceName)) { + taskAssignment.put(instanceName, new TreeSet<Integer>()); + } + taskAssignment.get(instanceName).add(partitionNumberForTask); + } + } + return taskAssignment; + } + + /** + * Returns TaskConfigs whose partition numbers (ids) are present in filteredPartitionNumbers. This + * means that these tasks should have the state of INIT, RUNNING, or null. This function basically + * converts partition numbers to corresponding TaskConfigs. + * @param jobContext + * @param filteredPartitionNumbers + * @return + */ + private Iterable<TaskConfig> getFilteredTaskConfigs(Set<Integer> filteredPartitionNumbers, + JobConfig jobConfig, JobContext jobContext) { + Set<TaskConfig> filteredTaskConfigs = new HashSet<>(); + for (int partitionNumber : filteredPartitionNumbers) { + String taskId = jobContext.getTaskIdForPartition(partitionNumber); + TaskConfig taskConfig = jobConfig.getTaskConfig(taskId); + filteredTaskConfigs.add(taskConfig); + } + return filteredTaskConfigs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 32eed2c..32a5370 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -55,8 +55,7 @@ import com.google.common.collect.Lists; public class WorkflowRebalancer extends TaskRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WorkflowRebalancer.class); private static final Set<TaskState> finalStates = new HashSet<>( - Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT) - ); + Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT)); @Override public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, @@ -78,7 +77,7 @@ public class WorkflowRebalancer extends TaskRebalancer { TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); - cleanupWorkflow(workflow, workflowCfg); + cleanupWorkflow(workflow, workflowCfg); return buildEmptyAssignment(workflow, currStateOutput); } @@ -89,8 +88,8 @@ public class WorkflowRebalancer extends TaskRebalancer { // If timeout point has already been passed, it will not be scheduled scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout()); - if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout( - workflowCtx.getStartTime(), workflowCfg.getTimeout())) { + if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) + && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { workflowCtx.setWorkflowState(TaskState.TIMED_OUT); clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor()); } @@ -104,8 +103,8 @@ public class WorkflowRebalancer extends TaskRebalancer { // Step 3: handle workflow that should STOP // For workflows that already reached final states, STOP should not take into effect - if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP - .equals(targetState)) { + if (!finalStates.contains(workflowCtx.getWorkflowState()) + && TargetState.STOP.equals(targetState)) { LOG.info("Workflow " + workflow + "is marked as stopped."); if (isWorkflowStopped(workflowCtx, workflowCfg)) { workflowCtx.setWorkflowState(TaskState.STOPPED); @@ -122,8 +121,8 @@ public class WorkflowRebalancer extends TaskRebalancer { // monitor if provided // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed // This is to handle TIMED_OUT only - if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED - && isWorkflowFinished(workflowCtx, workflowCfg, clusterData.getJobConfigMap())) { + if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx, + workflowCfg, clusterData.getJobConfigMap(), clusterData)) { workflowCtx.setFinishTime(currentTime); updateWorkflowMonitor(workflowCtx, workflowCfg); clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor()); @@ -148,17 +147,16 @@ public class WorkflowRebalancer extends TaskRebalancer { if (!isWorkflowReadyForSchedule(workflowCfg)) { LOG.info("Workflow " + workflow + " is not ready to schedule"); // set the timer to trigger future schedule - _rebalanceScheduler - .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime()); + _rebalanceScheduler.scheduleRebalance(_manager, workflow, + workflowCfg.getStartTime().getTime()); return buildEmptyAssignment(workflow, currStateOutput); } // Check for readiness, and stop processing if it's not ready - boolean isReady = - scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx, clusterData); + boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx, clusterData); if (isReady) { // Schedule jobs from this workflow. - scheduleJobs(workflow, workflowCfg, workflowCtx, clusterData.getJobConfigMap()); + scheduleJobs(workflow, workflowCfg, workflowCtx, clusterData.getJobConfigMap(), clusterData); } else { LOG.debug("Workflow " + workflow + " is not ready to be scheduled."); } @@ -172,7 +170,8 @@ public class WorkflowRebalancer extends TaskRebalancer { return buildEmptyAssignment(workflow, currStateOutput); } - private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterData, String workflowName) { + private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterData, + String workflowName) { WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName); if (workflowCtx == null) { WorkflowConfig config = clusterData.getWorkflowConfig(workflowName); @@ -189,7 +188,8 @@ public class WorkflowRebalancer extends TaskRebalancer { * and if it's ready, then just schedule it */ private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, - WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap) { + WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap, + ClusterDataCache clusterDataCache) { ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); if (scheduleConfig != null && scheduleConfig.isRecurring()) { LOG.debug("Jobs from recurring workflow are not schedule-able"); @@ -217,7 +217,8 @@ public class WorkflowRebalancer extends TaskRebalancer { } // check ancestor job status - if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap)) { + if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap, + clusterDataCache)) { JobConfig jobConfig = jobConfigMap.get(job); if (jobConfig == null) { LOG.error(String.format("The job config is missing for job %s", job)); @@ -249,9 +250,9 @@ public class WorkflowRebalancer extends TaskRebalancer { } } } - long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflow) == -1 - ? Long.MAX_VALUE - : _rebalanceScheduler.getRebalanceTime(workflow); + long currentScheduledTime = + _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE + : _rebalanceScheduler.getRebalanceTime(workflow); if (timeToSchedule < currentScheduledTime) { _rebalanceScheduler.scheduleRebalance(_manager, workflow, timeToSchedule); } @@ -280,7 +281,8 @@ public class WorkflowRebalancer extends TaskRebalancer { admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource()); if (targetIs == null) { LOG.warn("Target resource does not exist for job " + jobResource); - // do not need to fail here, the job will be marked as failure immediately when job starts running. + // do not need to fail here, the job will be marked as failure immediately when job starts + // running. } else { numPartitions = targetIs.getPartitionSet().size(); } @@ -329,10 +331,9 @@ public class WorkflowRebalancer extends TaskRebalancer { /** * Check if a workflow is ready to schedule, and schedule a rebalance if it is not - * * @param workflow the Helix resource associated with the workflow - * @param workflowCfg the workflow to check - * @param workflowCtx the current workflow context + * @param workflowCfg the workflow to check + * @param workflowCtx the current workflow context * @return true if the workflow is ready for schedule, false if not ready */ private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg, @@ -377,7 +378,6 @@ public class WorkflowRebalancer extends TaskRebalancer { long offsetMultiplier = (-delayFromStart) / period; long timeToSchedule = period * offsetMultiplier + startTime.getTime(); - // Now clone the workflow if this clone has not yet been created DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss"); df.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -394,8 +394,8 @@ public class WorkflowRebalancer extends TaskRebalancer { driver.start(clonedWf); } catch (Exception e) { LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); - _clusterStatusMonitor - .updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED); + _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), + TaskState.FAILED); } // Persist workflow start regardless of success to avoid retrying and failing workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName); @@ -422,11 +422,10 @@ public class WorkflowRebalancer extends TaskRebalancer { /** * Create a new workflow based on an existing one - * - * @param manager connection to Helix + * @param manager connection to Helix * @param origWorkflowName the name of the existing workflow - * @param newWorkflowName the name of the new workflow - * @param newStartTime a provided start time that deviates from the desired start time + * @param newWorkflowName the name of the new workflow + * @param newStartTime a provided start time that deviates from the desired start time * @return the cloned workflow, or null if there was a problem cloning the existing one */ public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName, @@ -514,7 +513,8 @@ public class WorkflowRebalancer extends TaskRebalancer { for (String job : jobs) { _rebalanceScheduler.removeScheduledRebalance(job); } - if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), workflow, jobs)) { + if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(), + _manager.getHelixPropertyStore(), workflow, jobs)) { LOG.warn("Failed to clean up workflow " + workflow); } } else { @@ -525,7 +525,6 @@ public class WorkflowRebalancer extends TaskRebalancer { /** * Clean up all jobs that are COMPLETED and passes its expiry time. - * * @param workflowConfig * @param workflowContext */ @@ -537,26 +536,24 @@ public class WorkflowRebalancer extends TaskRebalancer { long currentTime = System.currentTimeMillis(); if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { - Set<String> expiredJobs = TaskUtil - .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), - workflowConfig, workflowContext); + Set<String> expiredJobs = TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(), + _manager.getHelixPropertyStore(), workflowConfig, workflowContext); if (expiredJobs.isEmpty()) { LOG.info("No job to purge for the queue " + workflow); } else { LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); for (String job : expiredJobs) { - if (!TaskUtil - .removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) { + if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), + job)) { LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); } _rebalanceScheduler.removeScheduledRebalance(job); } - if (!TaskUtil - .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) { - LOG.warn( - "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow " - + workflow); + if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, + true)) { + LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + + " from the workflow " + workflow); } // remove job states in workflowContext. workflowContext.removeJobStates(expiredJobs); @@ -582,4 +579,4 @@ public class WorkflowRebalancer extends TaskRebalancer { // Nothing to do here with workflow resource. return currentIdealState; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java index 4e51f80..b13bb61 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java @@ -198,7 +198,7 @@ public class AssignableInstance { LiveInstance liveInstance) { logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName()); boolean refreshCapacity = false; - if (clusterConfig != null) { + if (clusterConfig != null && clusterConfig.getTaskQuotaRatioMap() != null) { if (!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap())) { refreshCapacity = true; } @@ -212,7 +212,7 @@ public class AssignableInstance { "Cannot update live instance with different instance name. Current: {}; new: {}", _instanceConfig.getInstanceName(), liveInstance.getInstanceName()); } else { - if (!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) { + if (liveInstance.getResourceCapacityMap() != null && !liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) { refreshCapacity = true; } _liveInstance = liveInstance; @@ -248,13 +248,17 @@ public class AssignableInstance { * @return TaskAssignResult * @throws IllegalArgumentException if task is null */ - public TaskAssignResult tryAssign(TaskConfig task, String quotaType) + public synchronized TaskAssignResult tryAssign(TaskConfig task, String quotaType) throws IllegalArgumentException { if (task == null) { throw new IllegalArgumentException("Task is null!"); } if (_currentAssignments.contains(task.getId())) { + logger.warn( + "Task: {} of quotaType: {} is already assigned to this instance. Instance name: {}", + task.getId(), quotaType, getInstanceName()); + return new TaskAssignResult(task, quotaType, this, false, 0, TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED, String.format("Task %s is already assigned to this instance. Need to release it first", @@ -266,14 +270,27 @@ public class AssignableInstance { // Fail when no such resource type if (!_totalCapacity.containsKey(resourceType)) { + + logger.warn( + "AssignableInstance does not support the given resourceType: {}. Task: {}, quotaType: {}, Instance name: {}", + resourceType, task.getId(), quotaType, getInstanceName()); + return new TaskAssignResult(task, quotaType, this, false, 0, TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE, String.format("Requested resource type %s not supported. Available resource types: %s", resourceType, _totalCapacity.keySet())); } - // Fail when no such quota type + // Fail when no such quota type. However, if quotaType is null, treat it as DEFAULT + if (quotaType == null || quotaType.equals("")) { + quotaType = DEFAULT_QUOTA_TYPE; + } if (!_totalCapacity.get(resourceType).containsKey(quotaType)) { + + logger.warn( + "AssignableInstance does not support the given quotaType: {}. Task: {}, quotaType: {}, Instance name: {}", + quotaType, task.getId(), quotaType, getInstanceName()); + return new TaskAssignResult(task, quotaType, this, false, 0, TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE, String.format("Requested quota type %s not defined. Available quota types: %s", quotaType, @@ -285,6 +302,11 @@ public class AssignableInstance { // Fail with insufficient quota if (capacity <= usage) { + + logger.warn( + "AssignableInstance does not have enough capacity for quotaType: {}. Task: {}, quotaType: {}, Instance name: {}. Current capacity: {} capacity needed to schedule: {}", + quotaType, task.getId(), quotaType, getInstanceName(), capacity, usage); + return new TaskAssignResult(task, quotaType, this, false, 0, TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, String.format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType, @@ -305,7 +327,7 @@ public class AssignableInstance { * @throws IllegalStateException if TaskAssignResult is not successful or the task is double * assigned, or the task is not assigned to this instance */ - public void assign(TaskAssignResult result) throws IllegalStateException { + public synchronized void assign(TaskAssignResult result) throws IllegalStateException { if (!result.isSuccessful()) { throw new IllegalStateException("Cannot assign a failed result: " + result); } @@ -349,15 +371,22 @@ public class AssignableInstance { * Performs the following to release resource for a task: * 1. Release the resource by adding back what the task required. * 2. Remove the TaskAssignResult from _currentAssignments + * Note that if the given quotaType is null, AssignableInstance will try to release from DEFAULT + * type. * @param taskConfig config of this task * @param quotaType quota type this task belongs to */ - public void release(TaskConfig taskConfig, String quotaType) { + public synchronized boolean release(TaskConfig taskConfig, String quotaType) { if (!_currentAssignments.contains(taskConfig.getId())) { logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(), _instanceConfig.getInstanceName()); - return; + return false; } + if (quotaType == null) { + logger.warn("Task {}'s quotaType is null. Trying to release as DEFAULT type.", taskConfig.getId()); + quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE; + } + String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(); // We might be releasing a task whose resource requirement / quota type is out-dated, @@ -366,10 +395,12 @@ public class AssignableInstance { && _usedCapacity.get(resourceType).containsKey(quotaType)) { int curUsage = _usedCapacity.get(resourceType).get(quotaType); _usedCapacity.get(resourceType).put(quotaType, curUsage - 1); + _currentAssignments.remove(taskConfig.getId()); + logger.info("Released task {} from instance {}", taskConfig.getId(), + _instanceConfig.getInstanceName()); + return true; } - _currentAssignments.remove(taskConfig.getId()); - logger.info("Released task {} from instance {}", taskConfig.getId(), - _instanceConfig.getInstanceName()); + return false; } /** http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java index fee54e5..5d749fb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java +++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java @@ -71,6 +71,12 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { logger.warn("No instance to assign!"); return buildNoInstanceAssignment(tasks, quotaType); } + if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) { + // Sometimes null is stored as a String literal + logger.warn("Quota type is null. Assigning it as DEFAULT type!"); + quotaType = DEFAULT_QUOTA_TYPE; + } + logger.info("Assigning tasks with quota type {}", quotaType); // Build a sched queue @@ -87,6 +93,9 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner { continue; } + // TODO: Review this logic + // TODO: 1. It assumes that the only mode of failure is due to insufficient capacity. This assumption may not always be true. Verify + // TODO: 2. All TaskAssignResults will get failureReason/Description/TaskID for the first task that failed. This will need correction // Every time we try to assign the task to the least-used instance, if that fails, // we assume all subsequent tasks will fail with same reason if (lastFailure != null) { http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java index 3ad1062..9861518 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java @@ -18,7 +18,7 @@ public class TestBatchEnableInstances extends TaskTestBase { _numDbs = 1; _numReplicas = 3; _numNodes = 5; - _numParitions = 4; + _numPartitions = 4; super.beforeClass(); _accessor = new ConfigAccessor(_gZkClient); } @@ -31,7 +31,7 @@ public class TestBatchEnableInstances extends TaskTestBase { ExternalView externalView = _gSetupTool.getClusterManagementTool() .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); - Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numPartitions); for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName())); } @@ -48,7 +48,7 @@ public class TestBatchEnableInstances extends TaskTestBase { ExternalView externalView = _gSetupTool.getClusterManagementTool() .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); - Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numPartitions); for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName())); Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName())); @@ -69,7 +69,7 @@ public class TestBatchEnableInstances extends TaskTestBase { ExternalView externalView = _gSetupTool.getClusterManagementTool() .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); - Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numPartitions); int numOfFirstHost = 0; for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { if (stateMap.keySet().contains(_participants[0].getInstanceName())) { @@ -92,7 +92,7 @@ public class TestBatchEnableInstances extends TaskTestBase { ExternalView externalView = _gSetupTool.getClusterManagementTool() .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); - Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numPartitions); int numOfFirstHost = 0; for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { if (stateMap.keySet().contains(_participants[0].getInstanceName())) { @@ -105,4 +105,4 @@ public class TestBatchEnableInstances extends TaskTestBase { Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), true); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java index a2c87c0..8e669ac 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java @@ -59,7 +59,7 @@ public class TestStateTransitionCancellation extends TaskTestBase { public void beforeClass() throws Exception { _participants = new MockParticipantManager[_numNodes]; _numDbs = 1; - _numParitions = 20; + _numPartitions = 20; _numNodes = 2; _numReplicas = 2; _verifier = @@ -174,7 +174,7 @@ public class TestStateTransitionCancellation extends TaskTestBase { // Either partial of state transitions have been cancelled or all the Slave -> Master // reassigned to other cluster - Assert.assertTrue((numOfMasters > 0 && numOfMasters <= _numParitions)); + Assert.assertTrue((numOfMasters > 0 && numOfMasters <= _numPartitions)); } private void stateCleanUp() { @@ -292,4 +292,4 @@ public class TestStateTransitionCancellation extends TaskTestBase { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java index e643c9a..daa3fc4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java @@ -19,7 +19,7 @@ public class TestClusterMaintenanceMode extends TaskTestBase { _numDbs = 1; _numNodes = 3; _numReplicas = 3; - _numParitions = 5; + _numPartitions = 5; super.beforeClass(); } @@ -89,4 +89,4 @@ public class TestClusterMaintenanceMode extends TaskTestBase { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java index 650456b..deca2ea 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java @@ -39,7 +39,7 @@ public class TestTargetExternalView extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { _numDbs = 3; - _numParitions = 8; + _numPartitions = 8; _numNodes = 4; _numReplicas = 2; super.beforeClass(); @@ -92,4 +92,4 @@ public class TestTargetExternalView extends TaskTestBase { idealStates.get(i).getRecord().getListFields()); } } -} +} \ No newline at end of file
