Repository: helix Updated Branches: refs/heads/master 442cd096d -> 84c2feabb
Create AbstractTaskDispatcher for future Task Framework Refactor existing code logic that move future needed logic to AbstractTaskDispatcher. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/84c2feab Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/84c2feab Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/84c2feab Branch: refs/heads/master Commit: 84c2feabbc391e4069717d2a3c5b8c4adf32d946 Parents: 442cd09 Author: Junkai Xue <[email protected]> Authored: Fri May 11 11:44:11 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Jul 9 15:26:25 2018 -0700 ---------------------------------------------------------------------- .../helix/task/AbstractTaskDispatcher.java | 677 +++++++++++++++++++ .../org/apache/helix/task/JobRebalancer.java | 654 +++--------------- .../org/apache/helix/task/TaskRebalancer.java | 175 +---- .../java/org/apache/helix/task/TaskUtil.java | 63 +- .../apache/helix/task/WorkflowRebalancer.java | 2 +- 5 files changed, 827 insertions(+), 744 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java new file mode 100644 index 0000000..b447cf3 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -0,0 +1,677 @@ +package org.apache.helix.task; + +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTaskDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTaskDispatcher.class); + + // For connection management + protected HelixManager _manager; + protected static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler(); + protected ClusterStatusMonitor _clusterStatusMonitor; + + public void init(HelixManager manager) { + _manager = manager; + } + + // Job Update related methods + + public void updatePreviousAssignedTasksStatus( + Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances, + String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, + ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState, + Set<Integer> assignedPartitions, Set<Integer> partitionsToDropFromIs, + Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState, + Set<Integer> skippedPartitions) { + // Iterate through all instances + for (String instance : prevInstanceToTaskAssignments.keySet()) { + if (excludedInstances.contains(instance)) { + continue; + } + + Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance); + // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, + // TASK_ERROR, ERROR. + Set<Integer> donePartitions = new TreeSet<>(); + for (int pId : pSet) { + final String pName = pName(jobResource, pId); + TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput, jobResource, pId, pName, + instance, jobCtx); + + // Check for pending state transitions on this (partition, instance). + Message pendingMessage = + currStateOutput.getPendingState(jobResource, new Partition(pName), instance); + if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) { + processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance, + pendingMessage, jobState, currState, paMap, assignedPartitions); + continue; + } + + // Process any requested state transitions. + String requestedStateStr = + currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); + if (requestedStateStr != null && !requestedStateStr.isEmpty()) { + TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr); + if (requestedState.equals(currState)) { + LOG.warn(String.format( + "Requested state %s is the same as the current state for instance %s.", + requestedState, instance)); + } + + paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Instance %s requested a state transition to %s for partition %s.", instance, + requestedState, pName)); + } + continue; + } + + switch (currState) { + case RUNNING: { + TaskPartitionState nextState = TaskPartitionState.RUNNING; + if (jobState.equals(TaskState.TIMING_OUT)) { + nextState = TaskPartitionState.TASK_ABORTED; + } else if (jobTgtState.equals(TargetState.STOP)) { + nextState = TaskPartitionState.STOPPED; + } + + paMap.put(pId, new PartitionAssignment(instance, nextState.name())); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Setting task partition %s state to %s on instance %s.", pName, nextState, + instance)); + } + } + break; + case STOPPED: { + TaskPartitionState nextState; + if (jobTgtState.equals(TargetState.START)) { + nextState = TaskPartitionState.RUNNING; + } else { + nextState = TaskPartitionState.STOPPED; + } + + paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name())); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Setting task partition %s state to %s on instance %s.", pName, nextState, + instance)); + } + } + break; + case COMPLETED: { + // The task has completed on this partition. Mark as such in the context object. + donePartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has completed with state %s. Marking as such in rebalancer context.", + pName, currState)); + } + partitionsToDropFromIs.add(pId); + markPartitionCompleted(jobCtx, pId); + } + break; + case TIMED_OUT: + case TASK_ERROR: + case TASK_ABORTED: + case ERROR: { + donePartitions.add(pId); // The task may be rescheduled on a different instance. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", + pName, currState, jobCtx.getPartitionInfo(pId))); + } + markPartitionError(jobCtx, pId, currState, true); + // The error policy is to fail the task as soon a single partition fails for a specified + // maximum number of attempts or task is in ABORTED state. + // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't cause job fail. + // After all tasks are aborted, they will be dropped, because of job timeout. + if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) { + if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() + || currState.equals(TaskPartitionState.TASK_ABORTED) + || currState.equals(TaskPartitionState.ERROR)) { + skippedPartitions.add(pId); + partitionsToDropFromIs.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug("skippedPartitions:" + skippedPartitions); + } + } else { + // Mark the task to be started at some later time (if enabled) + markPartitionDelayed(jobCfg, jobCtx, pId); + } + } + } + break; + case INIT: + case DROPPED: { + // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. + donePartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has state %s. It will be dropped from the current ideal state.", + pName, currState)); + } + } + break; + default: + throw new AssertionError("Unknown enum symbol: " + currState); + } + } + + // Remove the set of task partitions that are completed or in one of the error states. + pSet.removeAll(donePartitions); + } + } + + /** + * Computes the partition name given the resource name and partition id. + */ + protected String pName(String resource, int pId) { + return resource + "_" + pId; + } + + /** + * An (instance, state) pair. + */ + protected static class PartitionAssignment { + public final String _instance; + public final String _state; + + PartitionAssignment(String instance, String state) { + _instance = instance; + _state = state; + } + } + + private TaskPartitionState updateJobContextAndGetTaskCurrentState(CurrentStateOutput currentStateOutput, + String jobResource, Integer pId, String pName, String instance, JobContext jobCtx) { + String currentStateString = currentStateOutput.getCurrentState(jobResource, new Partition( + pName), instance); + if (currentStateString == null) { + // Task state is either DROPPED or INIT + return jobCtx.getPartitionState(pId); + } + TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString); + jobCtx.setPartitionState(pId, currentState); + String taskMsg = currentStateOutput.getInfo(jobResource, new Partition( + pName), instance); + if (taskMsg != null) { + jobCtx.setPartitionInfo(pId, taskMsg); + } + return currentState; + } + + private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId, String pName, + String instance, Message pendingMessage, TaskState jobState, TaskPartitionState currState, + Map<Integer, PartitionAssignment> paMap, Set<Integer> assignedPartitions) { + + Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); + if (stateMap != null) { + String prevState = stateMap.get(instance); + if (!pendingMessage.getToState().equals(prevState)) { + LOG.warn(String.format("Task pending to-state is %s while previous assigned state is %s. This should not" + + "happen.", pendingMessage.getToState(), prevState)); + } + if (jobState == TaskState.TIMING_OUT + && currState == TaskPartitionState.INIT + && prevState.equals(TaskPartitionState.RUNNING.name())) { + // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT, + // so that Helix will cancel the transition. + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name())); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has a pending state transition on instance %s INIT->RUNNING. " + + "Setting it back to INIT so that Helix can cancel the transition(if enabled).", + pName, instance, prevState)); + } + } else { + // Otherwise, Just copy forward + // the state assignment from the previous ideal state. + paMap.put(pId, new PartitionAssignment(instance, prevState)); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", + pName, instance, prevState)); + } + } + } + } + + protected static void markPartitionCompleted(JobContext ctx, int pId) { + ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); + ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); + ctx.incrementNumAttempts(pId); + } + + protected static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state, + boolean incrementAttempts) { + ctx.setPartitionState(pId, state); + ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); + if (incrementAttempts) { + ctx.incrementNumAttempts(pId); + } + } + + protected static void markAllPartitionsError(JobContext ctx, TaskPartitionState state, + boolean incrementAttempts) { + for (int pId : ctx.getPartitionSet()) { + markPartitionError(ctx, pId, state, incrementAttempts); + } + } + + protected static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { + long delayInterval = cfg.getTaskRetryDelay(); + if (delayInterval <= 0) { + return; + } + long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; + ctx.setNextRetryTime(p, nextStartTime); + } + + protected void handleJobTimeout(JobContext jobCtx, WorkflowContext workflowCtx, + String jobResource, JobConfig jobCfg) { + jobCtx.setFinishTime(System.currentTimeMillis()); + workflowCtx.setJobState(jobResource, TaskState.TIMED_OUT); + // Mark all INIT task to TASK_ABORTED + for (int pId : jobCtx.getPartitionSet()) { + if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT) { + jobCtx.setPartitionState(pId, TaskPartitionState.TASK_ABORTED); + } + } + _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT); + _rebalanceScheduler.removeScheduledRebalance(jobResource); + TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + } + + protected void failJob(String jobName, WorkflowContext workflowContext, JobContext jobContext, + WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap) { + markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap); + // Mark all INIT task to TASK_ABORTED + for (int pId : jobContext.getPartitionSet()) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) { + jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED); + } + } + _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED); + _rebalanceScheduler.removeScheduledRebalance(jobName); + TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); + } + + // Compute real assignment from theoretical calculation with applied throttling or other logics + protected void handleAdditionalTaskAssignment( + Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances, + String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, + WorkflowConfig workflowConfig, WorkflowContext workflowCtx, ClusterDataCache cache, + ResourceAssignment prevTaskToInstanceStateAssignment, Set<Integer> assignedPartitions, + Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions, + TaskAssignmentCalculator taskAssignmentCal, Set<Integer> allPartitions, long currentTime, + Collection<String> liveInstances) { + // The excludeSet contains the set of task partitions that must be excluded from consideration + // when making any new assignments. + // This includes all completed, failed, delayed, and already assigned partitions. + Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); + addCompletedTasks(excludeSet, jobCtx, allPartitions); + addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); + excludeSet.addAll(skippedPartitions); + excludeSet.addAll(TaskUtil.getNonReadyPartitions(jobCtx, currentTime)); + // Get instance->[partition, ...] mappings for the target resource. + Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal + .getTaskAssignment(currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, + jobCfg, jobCtx, workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); + + if (!TaskUtil.isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) { + dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap, + jobCtx); + } + + for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet()) { + String instance = entry.getKey(); + if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) { + continue; + } + // 1. throttled by job configuration + // Contains the set of task partitions currently assigned to the instance. + Set<Integer> pSet = entry.getValue(); + int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); + // 2. throttled by participant capacity + int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask(); + if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) { + participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance(); + } + int participantLimitation = + participantCapacity - cache.getParticipantActiveTaskCount(instance); + // New tasks to be assigned + int numToAssign = Math.min(jobCfgLimitation, participantLimitation); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), " + + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation, + participantCapacity, numToAssign)); + } + if (numToAssign > 0) { + Set<Integer> throttledSet = new HashSet<Integer>(); + List<Integer> nextPartitions = + getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, throttledSet, + numToAssign); + for (Integer pId : nextPartitions) { + String pName = pName(jobResource, pId); + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); + excludeSet.add(pId); + jobCtx.setAssignedParticipant(pId, instance); + jobCtx.setPartitionState(pId, TaskPartitionState.INIT); + jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, + TaskPartitionState.RUNNING, instance)); + } + } + cache.setParticipantActiveTaskCount(instance, + cache.getParticipantActiveTaskCount(instance) + nextPartitions.size()); + if (!throttledSet.isEmpty()) { + LOG.debug( + throttledSet.size() + "tasks are ready but throttled when assigned to participant."); + } + } + } + } + + protected void scheduleForNextTask(String job, JobContext jobCtx, long now) { + // Figure out the earliest schedulable time in the future of a non-complete job + boolean shouldSchedule = false; + long earliestTime = Long.MAX_VALUE; + for (int p : jobCtx.getPartitionSet()) { + long retryTime = jobCtx.getNextRetryTime(p); + TaskPartitionState state = jobCtx.getPartitionState(p); + state = (state != null) ? state : TaskPartitionState.INIT; + Set<TaskPartitionState> errorStates = + Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, + TaskPartitionState.TIMED_OUT); + if (errorStates.contains(state) && retryTime > now && retryTime < earliestTime) { + earliestTime = retryTime; + shouldSchedule = true; + } + } + + // If any was found, then schedule it + if (shouldSchedule) { + long scheduledTime = _rebalanceScheduler.getRebalanceTime(job); + if (scheduledTime == -1 || earliestTime < scheduledTime) { + _rebalanceScheduler.scheduleRebalance(_manager, job, earliestTime); + } + } + } + + + + // add all partitions that have been tried maxNumberAttempts + protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, + JobConfig cfg) { + for (Integer pId : pIds) { + if (isTaskGivenup(ctx, cfg, pId)) { + set.add(pId); + } + } + } + + private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, + Set<Integer> excluded, Set<Integer> throttled, int n) { + List<Integer> result = new ArrayList<Integer>(); + for (Integer pId : candidatePartitions) { + if (!excluded.contains(pId)) { + if (result.size() < n) { + result.add(pId); + } else { + throttled.add(pId); + } + } + } + return result; + } + + private static void addCompletedTasks(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds) { + for (Integer pId : pIds) { + TaskPartitionState state = ctx.getPartitionState(pId); + if (state == TaskPartitionState.COMPLETED) { + set.add(pId); + } + } + } + + protected static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { + TaskPartitionState state = ctx.getPartitionState(pId); + if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) { + return true; + } + if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR) { + return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); + } + return false; + } + + + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right away. + */ + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING && !newAssignment + .get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); + } + } + } + } + + protected void markJobComplete(String jobName, JobContext jobContext, + WorkflowConfig workflowConfig, WorkflowContext workflowContext, + Map<String, JobConfig> jobConfigMap) { + long currentTime = System.currentTimeMillis(); + workflowContext.setJobState(jobName, TaskState.COMPLETED); + jobContext.setFinishTime(currentTime); + if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { + workflowContext.setFinishTime(currentTime); + updateWorkflowMonitor(workflowContext, workflowConfig); + } + scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); + } + + protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, + WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) { + long currentTime = System.currentTimeMillis(); + workflowContext.setJobState(jobName, TaskState.FAILED); + if (jobContext != null) { + jobContext.setFinishTime(currentTime); + } + if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { + workflowContext.setFinishTime(currentTime); + updateWorkflowMonitor(workflowContext, workflowConfig); + } + scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); + } + + protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig workflowConfig, + long currentTime) { + long currentScheduledTime = + _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 + ? Long.MAX_VALUE + : _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()); + if (currentTime + jobConfig.getExpiry() < currentScheduledTime) { + _rebalanceScheduler.scheduleRebalance(_manager, workflowConfig.getWorkflowId(), + currentTime + jobConfig.getExpiry()); + } + } + + + + + // Workflow related methods + + /** + * Checks if the workflow has finished (either completed or failed). + * Set the state in workflow context properly. + * + * @param ctx Workflow context containing job states + * @param cfg Workflow config containing set of jobs + * @return returns true if the workflow + * 1. completed (all tasks are {@link TaskState#COMPLETED}) + * 2. failed (any task is {@link TaskState#FAILED} + * 3. workflow is {@link TaskState#TIMED_OUT} + * returns false otherwise. + */ + protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg, + Map<String, JobConfig> jobConfigMap) { + boolean incomplete = false; + + TaskState workflowState = ctx.getWorkflowState(); + if (TaskState.TIMED_OUT.equals(workflowState)) { + // We don't update job state here as JobRebalancer will do it + return true; + } + + // Check if failed job count is beyond threshold and if so, fail the workflow + // and abort in-progress jobs + int failedJobs = 0; + for (String job : cfg.getJobDag().getAllNodes()) { + TaskState jobState = ctx.getJobState(job); + if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) { + failedJobs++; + if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) { + ctx.setWorkflowState(TaskState.FAILED); + for (String jobToFail : cfg.getJobDag().getAllNodes()) { + if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) { + ctx.setJobState(jobToFail, TaskState.ABORTED); + // Skip aborted jobs latency since they are not accurate latency for job running time + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor + .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED); + } + } + } + return true; + } + } + if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED + && jobState != TaskState.TIMED_OUT) { + incomplete = true; + } + } + + if (!incomplete && cfg.isTerminable()) { + ctx.setWorkflowState(TaskState.COMPLETED); + return true; + } + + return false; + } + + protected void updateWorkflowMonitor(WorkflowContext context, + WorkflowConfig config) { + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.updateWorkflowCounters(config, context.getWorkflowState(), + context.getFinishTime() - context.getStartTime()); + } + } + + + // Common methods + + protected Set<String> getExcludedInstances(String currentJobName, WorkflowConfig workflowCfg, + ClusterDataCache cache) { + Set<String> ret = new HashSet<String>(); + + if (!workflowCfg.isAllowOverlapJobAssignment()) { + // exclude all instances that has been assigned other jobs' tasks + for (String jobName : workflowCfg.getJobDag().getAllNodes()) { + if (jobName.equals(currentJobName)) { + continue; + } + JobContext jobContext = cache.getJobContext(jobName); + if (jobContext == null) { + continue; + } + for (int pId : jobContext.getPartitionSet()) { + TaskPartitionState partitionState = jobContext.getPartitionState(pId); + if (partitionState == TaskPartitionState.INIT + || partitionState == TaskPartitionState.RUNNING) { + ret.add(jobContext.getAssignedParticipant(pId)); + } + } + } + } + return ret; + } + + /** + * Schedule the rebalancer timer for task framework elements + * @param resourceId The resource id + * @param startTime The resource start time + * @param timeoutPeriod The resource timeout period. Will be -1 if it is not set. + */ + protected void scheduleRebalanceForTimeout(String resourceId, long startTime, + long timeoutPeriod) { + long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); + long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId); + if (nextTimeout >= System.currentTimeMillis() && ( + nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT + || nextTimeout < nextRebalanceTime)) { + _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout); + } + } + + /** + * Basic function to check task framework resources, workflow and job, are timeout + * @param startTime Resources start time + * @param timeoutPeriod Resources timeout period. Will be -1 if it is not set. + * @return + */ + protected boolean isTimeout(long startTime, long timeoutPeriod) { + long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); + return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout <= System + .currentTimeMillis(); + } + + private long getTimeoutTime(long startTime, long timeoutPeriod) { + return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT + || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow + ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod; + } + + + /** + * Set the ClusterStatusMonitor for metrics update + */ + public void setClusterStatusMonitor(ClusterStatusMonitor clusterStatusMonitor) { + _clusterStatusMonitor = clusterStatusMonitor; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index fd80229..5f05acc 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -36,6 +36,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; @@ -64,8 +65,9 @@ public class JobRebalancer extends TaskRebalancer { private static final String PREV_RA_NODE = "PreviousResourceAssignment"; @Override - public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, - IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { + public ResourceAssignment computeBestPossiblePartitionState( + ClusterDataCache clusterData, IdealState taskIs, Resource resource, + CurrentStateOutput currStateOutput) { final String jobName = resource.getResourceName(); LOG.debug("Computer Best Partition for job: " + jobName); @@ -101,8 +103,8 @@ public class JobRebalancer extends TaskRebalancer { TaskState workflowState = workflowCtx.getWorkflowState(); TaskState jobState = workflowCtx.getJobState(jobName); // The job is already in a final state (completed/failed). - if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED || - jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { + if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED + || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { LOG.info(String.format( "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.", workflowResource, jobName, workflowState, jobState)); @@ -116,8 +118,8 @@ public class JobRebalancer extends TaskRebalancer { return buildEmptyAssignment(jobName, currStateOutput); } - if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, - workflowCtx, getInCompleteJobCount(workflowCfg, workflowCtx), + if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, + workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx), clusterData.getJobConfigMap())) { LOG.info("Job is not ready to run " + jobName); return buildEmptyAssignment(jobName, currStateOutput); @@ -154,10 +156,40 @@ public class JobRebalancer extends TaskRebalancer { LOG.error("No available instance found for job!"); } - Set<Integer> partitionsToDrop = new TreeSet<Integer>(); + TargetState jobTgtState = workflowCfg.getTargetState(); + jobState = workflowCtx.getJobState(jobName); + workflowState = workflowCtx.getWorkflowState(); + + if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) + || TaskState.TIMED_OUT.equals(workflowState))) { + jobState = TaskState.TIMING_OUT; + workflowCtx.setJobState(jobName, TaskState.TIMING_OUT); + } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) { + // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted + // Update running status in workflow context + if (jobTgtState == TargetState.STOP) { + if (TaskUtil.checkJobStopped(jobCtx)) { + workflowCtx.setJobState(jobName, TaskState.STOPPED); + } else { + workflowCtx.setJobState(jobName, TaskState.STOPPING); + } + // Workflow has been stopped if all in progress jobs are stopped + if (isWorkflowStopped(workflowCtx, workflowCfg)) { + workflowCtx.setWorkflowState(TaskState.STOPPED); + } else { + workflowCtx.setWorkflowState(TaskState.STOPPING); + } + } else { + workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS); + // Workflow is in progress if any task is in progress + workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); + } + } + + Set<Integer> partitionsToDrop = new TreeSet<>(); ResourceAssignment newAssignment = - computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances, - currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData); + computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, prevAssignment, + liveInstances, currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName); @@ -181,74 +213,21 @@ public class JobRebalancer extends TaskRebalancer { return newAssignment; } - private Set<String> getExcludedInstances(String currentJobName, - WorkflowConfig workflowCfg, ClusterDataCache cache) { - Set<String> ret = new HashSet<String>(); - - if (!workflowCfg.isAllowOverlapJobAssignment()) { - // exclude all instances that has been assigned other jobs' tasks - for (String jobName : workflowCfg.getJobDag().getAllNodes()) { - if (jobName.equals(currentJobName)) { - continue; - } - JobContext jobContext = cache.getJobContext(jobName); - if (jobContext == null) { - continue; - } - for (int pId : jobContext.getPartitionSet()) { - TaskPartitionState partitionState = jobContext.getPartitionState(pId); - if (partitionState == TaskPartitionState.INIT || partitionState == TaskPartitionState.RUNNING) { - ret.add(jobContext.getAssignedParticipant(pId)); - } - } - } - } - return ret; - } - private ResourceAssignment computeResourceMapping(String jobResource, - WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevTaskToInstanceStateAssignment, - Collection<String> liveInstances, CurrentStateOutput currStateOutput, - WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, - ClusterDataCache cache) { - TargetState jobTgtState = workflowConfig.getTargetState(); - TaskState jobState = workflowCtx.getJobState(jobResource); - TaskState workflowState = workflowCtx.getWorkflowState(); + WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState, + ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> liveInstances, + CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx, + Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) { - if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) - || TaskState.TIMED_OUT.equals(workflowState))) { - jobState = TaskState.TIMING_OUT; - workflowCtx.setJobState(jobResource, TaskState.TIMING_OUT); - } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) { - // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted - // Update running status in workflow context - if (jobTgtState == TargetState.STOP) { - if (checkJobStopped(jobCtx)) { - workflowCtx.setJobState(jobResource, TaskState.STOPPED); - } else { - workflowCtx.setJobState(jobResource, TaskState.STOPPING); - } - // Workflow has been stopped if all in progress jobs are stopped - if (isWorkflowStopped(workflowCtx, workflowConfig)) { - workflowCtx.setWorkflowState(TaskState.STOPPED); - } else { - workflowCtx.setWorkflowState(TaskState.STOPPING); - } - } else { - workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS); - // Workflow is in progress if any task is in progress - workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); - } - } // Used to keep track of tasks that have already been assigned to instances. - Set<Integer> assignedPartitions = new HashSet<Integer>(); + Set<Integer> assignedPartitions = new HashSet<>(); // Used to keep track of tasks that have failed, but whose failure is acceptable - Set<Integer> skippedPartitions = new HashSet<Integer>(); + Set<Integer> skippedPartitions = new HashSet<>(); // Keeps a mapping of (partition) -> (instance, state) - Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>(); + Map<Integer, PartitionAssignment> paMap = new TreeMap<>(); Set<String> excludedInstances = getExcludedInstances(jobResource, workflowConfig, cache); @@ -259,7 +238,8 @@ public class JobRebalancer extends TaskRebalancer { if (allPartitions == null || allPartitions.isEmpty()) { // Empty target partitions, mark the job as FAILED. - String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!"; + String failureMsg = + "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!"; LOG.info(failureMsg); jobCtx.setInfo(failureMsg); failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); @@ -268,7 +248,8 @@ public class JobRebalancer extends TaskRebalancer { } Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments = - getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment, allPartitions); + getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment, + allPartitions); long currentTime = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { @@ -277,152 +258,14 @@ public class JobRebalancer extends TaskRebalancer { + " excludedInstances: " + excludedInstances); } - // Iterate through all instances - for (String instance : prevInstanceToTaskAssignments.keySet()) { - if (excludedInstances.contains(instance)) { - continue; - } - - Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance); - // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, - // TASK_ERROR, ERROR. - Set<Integer> donePartitions = new TreeSet<Integer>(); - for (int pId : pSet) { - final String pName = pName(jobResource, pId); - TaskPartitionState currState = - updateJobContextAndGetTaskCurrentState(currStateOutput, jobResource, pId, pName, instance, jobCtx); - - // Check for pending state transitions on this (partition, instance). - Message pendingMessage = - currStateOutput.getPendingState(jobResource, new Partition(pName), instance); - if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) { - processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance, - pendingMessage, jobState, currState, paMap, assignedPartitions); - continue; - } - - // Process any requested state transitions. - String requestedStateStr = - currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); - if (requestedStateStr != null && !requestedStateStr.isEmpty()) { - TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr); - if (requestedState.equals(currState)) { - LOG.warn(String.format( - "Requested state %s is the same as the current state for instance %s.", - requestedState, instance)); - } - - paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Instance %s requested a state transition to %s for partition %s.", instance, - requestedState, pName)); - } - continue; - } - - switch (currState) { - case RUNNING: { - TaskPartitionState nextState = TaskPartitionState.RUNNING; - if (jobState == TaskState.TIMING_OUT) { - nextState = TaskPartitionState.TASK_ABORTED; - } else if (jobTgtState == TargetState.STOP) { - nextState = TaskPartitionState.STOPPED; - } - - paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Setting task partition %s state to %s on instance %s.", pName, nextState, - instance)); - } - } - break; - case STOPPED: { - TaskPartitionState nextState; - if (jobTgtState == TargetState.START) { - nextState = TaskPartitionState.RUNNING; - } else { - nextState = TaskPartitionState.STOPPED; - } - - paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Setting task partition %s state to %s on instance %s.", pName, nextState, - instance)); - } - } - break; - case COMPLETED: { - // The task has completed on this partition. Mark as such in the context object. - donePartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has completed with state %s. Marking as such in rebalancer context.", - pName, currState)); - } - partitionsToDropFromIs.add(pId); - markPartitionCompleted(jobCtx, pId); - } - break; - case TIMED_OUT: - case TASK_ERROR: - case TASK_ABORTED: - case ERROR: { - donePartitions.add(pId); // The task may be rescheduled on a different instance. - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", - pName, currState, jobCtx.getPartitionInfo(pId))); - } - markPartitionError(jobCtx, pId, currState, true); - // The error policy is to fail the task as soon a single partition fails for a specified - // maximum number of attempts or task is in ABORTED state. - // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't cause job fail. - // After all tasks are aborted, they will be dropped, because of job timeout. - if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) { - if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() - || currState.equals(TaskPartitionState.TASK_ABORTED) - || currState.equals(TaskPartitionState.ERROR)) { - skippedPartitions.add(pId); - partitionsToDropFromIs.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug("skippedPartitions:" + skippedPartitions); - } - } else { - // Mark the task to be started at some later time (if enabled) - markPartitionDelayed(jobCfg, jobCtx, pId); - } - } - } - break; - case INIT: - case DROPPED: { - // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. - donePartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has state %s. It will be dropped from the current ideal state.", - pName, currState)); - } - } - break; - default: - throw new AssertionError("Unknown enum symbol: " + currState); - } - } - - // Remove the set of task partitions that are completed or in one of the error states. - pSet.removeAll(donePartitions); - } + updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource, + currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState, + assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions); addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg); - if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()) { + if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg + .getFailureThreshold()) { if (isJobFinished(jobCtx, jobResource, currStateOutput)) { failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap()); return buildEmptyAssignment(jobResource, currStateOutput); @@ -452,8 +295,7 @@ public class JobRebalancer extends TaskRebalancer { } if (isJobComplete(jobCtx, allPartitions, jobCfg)) { - markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, - cache.getJobConfigMap()); + markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap()); _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED, jobCtx.getFinishTime() - jobCtx.getStartTime()); _rebalanceScheduler.removeScheduledRebalance(jobResource); @@ -464,17 +306,7 @@ public class JobRebalancer extends TaskRebalancer { // If job is being timed out and no task is running (for whatever reason), idealState can be deleted and all tasks // can be dropped(note that Helix doesn't track whether the drop is success or not). if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) { - jobCtx.setFinishTime(System.currentTimeMillis()); - workflowCtx.setJobState(jobResource, TaskState.TIMED_OUT); - // Mark all INIT task to TASK_ABORTED - for (int pId : jobCtx.getPartitionSet()) { - if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT) { - jobCtx.setPartitionState(pId, TaskPartitionState.TASK_ABORTED); - } - } - _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT); - _rebalanceScheduler.removeScheduledRebalance(jobResource); - TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg); return buildEmptyAssignment(jobResource, currStateOutput); } @@ -482,80 +314,19 @@ public class JobRebalancer extends TaskRebalancer { scheduleForNextTask(jobResource, jobCtx, currentTime); // Make additional task assignments if needed. - if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT && jobTgtState == TargetState.START) { - // Contains the set of task partitions that must be excluded from consideration when making - // any new assignments. - // This includes all completed, failed, delayed, and already assigned partitions. - Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); - addCompletedTasks(excludeSet, jobCtx, allPartitions); - addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); - excludeSet.addAll(skippedPartitions); - excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); - // Get instance->[partition, ...] mappings for the target resource. - Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal - .getTaskAssignment(currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, jobCfg, jobCtx, - workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); - - if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) { - dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap, - jobCtx); - } - - for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet()) { - String instance = entry.getKey(); - if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances - .contains(instance)) { - continue; - } - // 1. throttled by job configuration - // Contains the set of task partitions currently assigned to the instance. - Set<Integer> pSet = entry.getValue(); - int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); - // 2. throttled by participant capacity - int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask(); - if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) { - participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance(); - } - int participantLimitation = participantCapacity - cache.getParticipantActiveTaskCount(instance); - // New tasks to be assigned - int numToAssign = Math.min(jobCfgLimitation, participantLimitation); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), " - + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation, - participantCapacity, numToAssign)); - } - if (numToAssign > 0) { - Set<Integer> throttledSet = new HashSet<Integer>(); - List<Integer> nextPartitions = - getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, throttledSet, numToAssign); - for (Integer pId : nextPartitions) { - String pName = pName(jobResource, pId); - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); - excludeSet.add(pId); - jobCtx.setAssignedParticipant(pId, instance); - jobCtx.setPartitionState(pId, TaskPartitionState.INIT); - jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Setting task partition %s state to %s on instance %s.", pName, - TaskPartitionState.RUNNING, instance)); - } - } - cache.setParticipantActiveTaskCount(instance, - cache.getParticipantActiveTaskCount(instance) + nextPartitions.size()); - if (!throttledSet.isEmpty()) { - LOG.debug( - throttledSet.size() + "tasks are ready but throttled when assigned to participant."); - } - } - } + if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT + && jobTgtState == TargetState.START) { + handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource, + currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache, + prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions, + taskAssignmentCal, allPartitions, currentTime, liveInstances); } return toResourceAssignment(jobResource, paMap); } - private ResourceAssignment toResourceAssignment(String jobResource, Map<Integer, PartitionAssignment> paMap) { + private ResourceAssignment toResourceAssignment(String jobResource, + Map<Integer, PartitionAssignment> paMap) { // Construct a ResourceAssignment object from the map of partition assignments. ResourceAssignment ra = new ResourceAssignment(jobResource); for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) { @@ -566,95 +337,6 @@ public class JobRebalancer extends TaskRebalancer { return ra; } - /** - * If assignment is different from previous assignment, drop the old running task if it's no - * longer assigned to the same instance, but not removing it from excludeSet because the same task - * should not be assigned to the new instance right way. - */ - private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, - Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, - JobContext jobContext) { - for (String instance : oldAssignment.keySet()) { - for (Integer pId : oldAssignment.get(instance)) { - if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING - && !newAssignment.get(instance).contains(pId)) { - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); - jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); - } - } - } - } - - private TaskPartitionState updateJobContextAndGetTaskCurrentState(CurrentStateOutput currentStateOutput, - String jobResource, Integer pId, String pName, String instance, JobContext jobCtx) { - String currentStateString = currentStateOutput.getCurrentState(jobResource, new Partition( - pName), instance); - if (currentStateString == null) { - // Task state is either DROPPED or INIT - return jobCtx.getPartitionState(pId); - } - TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString); - jobCtx.setPartitionState(pId, currentState); - String taskMsg = currentStateOutput.getInfo(jobResource, new Partition( - pName), instance); - if (taskMsg != null) { - jobCtx.setPartitionInfo(pId, taskMsg); - } - return currentState; - } - - private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId, String pName, - String instance, Message pendingMessage, TaskState jobState, TaskPartitionState currState, - Map<Integer, PartitionAssignment> paMap, Set<Integer> assignedPartitions) { - - Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); - if (stateMap != null) { - String prevState = stateMap.get(instance); - if (!pendingMessage.getToState().equals(prevState)) { - LOG.warn(String.format("Task pending to-state is %s while previous assigned state is %s. This should not" - + "heppen.", pendingMessage.getToState(), prevState)); - } - if (jobState == TaskState.TIMING_OUT - && currState == TaskPartitionState.INIT - && prevState.equals(TaskPartitionState.RUNNING.name())) { - // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT, - // so that Helix will cancel the transition. - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name())); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has a pending state transition on instance %s INIT->RUNNING. " - + "Setting it back to INIT so that Helix can cancel the transition(if enabled).", - pName, instance, prevState)); - } - } else { - // Otherwise, Just copy forward - // the state assignment from the previous ideal state. - paMap.put(pId, new PartitionAssignment(instance, prevState)); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", - pName, instance, prevState)); - } - } - } - } - - private void failJob(String jobName, WorkflowContext workflowContext, JobContext jobContext, - WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap) { - markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap); - // Mark all INIT task to TASK_ABORTED - for (int pId : jobContext.getPartitionSet()) { - if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) { - jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED); - } - } - _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED); - _rebalanceScheduler.removeScheduledRebalance(jobName); - TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); - } - private boolean isJobFinished(JobContext jobContext, String jobResource, CurrentStateOutput currentStateOutput) { for (int pId : jobContext.getPartitionSet()) { @@ -663,56 +345,19 @@ public class JobRebalancer extends TaskRebalancer { String instance = jobContext.getAssignedParticipant(pId); Message pendingMessage = currentStateOutput.getPendingState(jobResource, partition, instance); // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished - if (state == TaskPartitionState.RUNNING - || (state == TaskPartitionState.INIT && pendingMessage != null)) { + if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT + && pendingMessage != null)) { return false; } } return true; } - private void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, - WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) { - long currentTime = System.currentTimeMillis(); - workflowContext.setJobState(jobName, TaskState.COMPLETED); - jobContext.setFinishTime(currentTime); - if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { - workflowContext.setFinishTime(currentTime); - updateWorkflowMonitor(workflowContext, workflowConfig); - } - scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); - } - - private void scheduleForNextTask(String job, JobContext jobCtx, long now) { - // Figure out the earliest schedulable time in the future of a non-complete job - boolean shouldSchedule = false; - long earliestTime = Long.MAX_VALUE; - for (int p : jobCtx.getPartitionSet()) { - long retryTime = jobCtx.getNextRetryTime(p); - TaskPartitionState state = jobCtx.getPartitionState(p); - state = (state != null) ? state : TaskPartitionState.INIT; - Set<TaskPartitionState> errorStates = - Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, - TaskPartitionState.TIMED_OUT); - if (errorStates.contains(state) && retryTime > now && retryTime < earliestTime) { - earliestTime = retryTime; - shouldSchedule = true; - } - } - - // If any was found, then schedule it - if (shouldSchedule) { - long scheduledTime = _rebalanceScheduler.getRebalanceTime(job); - if (scheduledTime == -1 || earliestTime < scheduledTime) { - _rebalanceScheduler.scheduleRebalance(_manager, job, earliestTime); - } - } - } - /** * Get the last task assignment for a given job * * @param resourceName the name of the job + * * @return {@link ResourceAssignment} instance, or null if no assignment is available */ private ResourceAssignment getPrevResourceAssignment(String resourceName) { @@ -728,18 +373,16 @@ public class JobRebalancer extends TaskRebalancer { * @param resourceName the name of the job * @param ra {@link ResourceAssignment} containing the task assignment */ - private void setPrevResourceAssignment(String resourceName, - ResourceAssignment ra) { + private void setPrevResourceAssignment(String resourceName, ResourceAssignment ra) { _manager.getHelixPropertyStore() .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), ra.getRecord(), AccessOption.PERSISTENT); } /** - * Checks if the job has completed. - * Look at states of all tasks of the job, there're 3 kind: completed, given up, not given up. - * The job is completed if all tasks are completed or given up, and the number of given up tasks is within job - * failure threshold. + * Checks if the job has completed. Look at states of all tasks of the job, there're 3 kind: + * completed, given up, not given up. The job is completed if all tasks are completed or given up, + * and the number of given up tasks is within job failure threshold. */ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, JobConfig cfg) { int numOfGivenUpTasks = 0; @@ -747,7 +390,7 @@ public class JobRebalancer extends TaskRebalancer { for (Integer pId : allPartitions) { TaskPartitionState state = ctx.getPartitionState(pId); if (state != TaskPartitionState.COMPLETED) { - if(!isTaskGivenup(ctx, cfg, pId)) { + if (!isTaskGivenup(ctx, cfg, pId)) { return false; } // If the task is given up, there's still chance the job has completed because of job failure threshold. @@ -757,98 +400,23 @@ public class JobRebalancer extends TaskRebalancer { return numOfGivenUpTasks <= cfg.getFailureThreshold(); } - private static void addCompletedTasks(Set<Integer> set, JobContext ctx, - Iterable<Integer> pIds) { - for (Integer pId : pIds) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (state == TaskPartitionState.COMPLETED) { - set.add(pId); - } - } - } - - private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) { - return true; - } - if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR) { - return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); - } - return false; - } - - // add all partitions that have been tried maxNumberAttempts - private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, - JobConfig cfg) { - for (Integer pId : pIds) { - if (isTaskGivenup(ctx, cfg, pId)) { - set.add(pId); - } - } - } - - private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, - Set<Integer> excluded, Set<Integer> throttled, int n) { - List<Integer> result = new ArrayList<Integer>(); - for (Integer pId : candidatePartitions) { - if (!excluded.contains(pId)) { - if (result.size() < n) { - result.add(pId); - } else { - throttled.add(pId); - } - } - } - return result; - } - - private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { - long delayInterval = cfg.getTaskRetryDelay(); - if (delayInterval <= 0) { - return; - } - long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; - ctx.setNextRetryTime(p, nextStartTime); - } - - private static void markPartitionCompleted(JobContext ctx, int pId) { - ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - ctx.incrementNumAttempts(pId); - } - - private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state, - boolean incrementAttempts) { - ctx.setPartitionState(pId, state); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - if (incrementAttempts) { - ctx.incrementNumAttempts(pId); - } - } - - private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state, - boolean incrementAttempts) { - for (int pId : ctx.getPartitionSet()) { - markPartitionError(ctx, pId, state, incrementAttempts); - } - } - /** * @param liveInstances - * @param prevAssignment task partition -> (instance -> state) + * @param prevAssignment task partition -> (instance -> state) * @param allTaskPartitions all task partitionIds + * * @return instance -> partitionIds from previous assignment, if the instance is still live */ private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments( - Iterable<String> liveInstances, ResourceAssignment prevAssignment, Set<Integer> allTaskPartitions) { + Iterable<String> liveInstances, ResourceAssignment prevAssignment, + Set<Integer> allTaskPartitions) { Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); for (String instance : liveInstances) { result.put(instance, new TreeSet<Integer>()); } for (Partition partition : prevAssignment.getMappedPartitions()) { - int pId = getPartitionId(partition.getPartitionName()); + int pId = TaskUtil.getPartitionId(partition.getPartitionName()); if (allTaskPartitions.contains(pId)) { Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition); for (String instance : replicaMap.keySet()) { @@ -862,67 +430,7 @@ public class JobRebalancer extends TaskRebalancer { return result; } - /* Extracts the partition id from the given partition name. */ - private static int getPartitionId(String pName) { - int index = pName.lastIndexOf("_"); - if (index == -1) { - throw new HelixException("Invalid partition name " + pName); - } - return Integer.valueOf(pName.substring(index + 1)); - } - - private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { - Set<Integer> nonReadyPartitions = Sets.newHashSet(); - for (int p : ctx.getPartitionSet()) { - long toStart = ctx.getNextRetryTime(p); - if (now < toStart) { - nonReadyPartitions.add(p); - } - } - return nonReadyPartitions; - } - private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { - return isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal; - } - - private boolean isGenericTaskJob(JobConfig jobConfig) { - Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); - return taskConfigMap != null && !taskConfigMap.isEmpty(); - } - - /** - * Check whether tasks are not in final states - * @param jobContext The job context - * @return False if still tasks not in final state. Otherwise return true - */ - private boolean checkJobStopped(JobContext jobContext) { - for (int partition : jobContext.getPartitionSet()) { - TaskPartitionState taskState = jobContext.getPartitionState(partition); - if (taskState == TaskPartitionState.RUNNING) { - return false; - } - } - return true; - } - - /** - * Computes the partition name given the resource name and partition id. - */ - private String pName(String resource, int pId) { - return resource + "_" + pId; - } - - /** - * An (instance, state) pair. - */ - private static class PartitionAssignment { - private final String _instance; - private final String _state; - - private PartitionAssignment(String instance, String state) { - _instance = instance; - _state = state; - } + return TaskUtil.isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal; } } http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index d8b8ea9..1d29368 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -43,79 +43,19 @@ import com.google.common.collect.Maps; /** * Abstract rebalancer class for the {@code Task} state model. */ -public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { +public abstract class TaskRebalancer extends AbstractTaskDispatcher + implements Rebalancer, MappingCalculator { private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class); - // For connection management - protected HelixManager _manager; - protected static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler(); - protected ClusterStatusMonitor _clusterStatusMonitor; - - @Override public void init(HelixManager manager) { + @Override + public void init(HelixManager manager) { _manager = manager; } - @Override public abstract ResourceAssignment computeBestPossiblePartitionState( - ClusterDataCache clusterData, IdealState taskIs, Resource resource, - CurrentStateOutput currStateOutput); - - /** - * Checks if the workflow has finished (either completed or failed). - * Set the state in workflow context properly. - * - * @param ctx Workflow context containing job states - * @param cfg Workflow config containing set of jobs - * @return returns true if the workflow - * 1. completed (all tasks are {@link TaskState#COMPLETED}) - * 2. failed (any task is {@link TaskState#FAILED} - * 3. workflow is {@link TaskState#TIMED_OUT} - * returns false otherwise. - */ - protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg, - Map<String, JobConfig> jobConfigMap) { - boolean incomplete = false; - - TaskState workflowState = ctx.getWorkflowState(); - if (TaskState.TIMED_OUT.equals(workflowState)) { - // We don't update job state here as JobRebalancer will do it - return true; - } - - // Check if failed job count is beyond threshold and if so, fail the workflow - // and abort in-progress jobs - int failedJobs = 0; - for (String job : cfg.getJobDag().getAllNodes()) { - TaskState jobState = ctx.getJobState(job); - if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) { - failedJobs++; - if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) { - ctx.setWorkflowState(TaskState.FAILED); - for (String jobToFail : cfg.getJobDag().getAllNodes()) { - if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) { - ctx.setJobState(jobToFail, TaskState.ABORTED); - // Skip aborted jobs latency since they are not accurate latency for job running time - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor - .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED); - } - } - } - return true; - } - } - if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED - && jobState != TaskState.TIMED_OUT) { - incomplete = true; - } - } - - if (!incomplete && cfg.isTerminable()) { - ctx.setWorkflowState(TaskState.COMPLETED); - return true; - } - return false; - } + @Override + public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput); /** * Checks if the workflow has been stopped. @@ -224,64 +164,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return true; } - protected boolean isJobStarted(String job, WorkflowContext workflowContext) { - TaskState jobState = workflowContext.getJobState(job); - return (jobState != null && jobState != TaskState.NOT_STARTED); - } - - /** - * Count the number of jobs in a workflow that are in progress. - * - * @param workflowCfg - * @param workflowCtx - * @return - */ - protected int getInCompleteJobCount(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { - int inCompleteCount = 0; - for (String jobName : workflowCfg.getJobDag().getAllNodes()) { - TaskState jobState = workflowCtx.getJobState(jobName); - if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { - ++inCompleteCount; - } - } - - return inCompleteCount; - } - - protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, - WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) { - long currentTime = System.currentTimeMillis(); - workflowContext.setJobState(jobName, TaskState.FAILED); - if (jobContext != null) { - jobContext.setFinishTime(currentTime); - } - if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) { - workflowContext.setFinishTime(currentTime); - updateWorkflowMonitor(workflowContext, workflowConfig); - } - scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); - } - - protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig workflowConfig, - long currentTime) { - long currentScheduledTime = - _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 - ? Long.MAX_VALUE - : _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()); - if (currentTime + jobConfig.getExpiry() < currentScheduledTime) { - _rebalanceScheduler.scheduleRebalance(_manager, workflowConfig.getWorkflowId(), - currentTime + jobConfig.getExpiry()); - } - } - - protected void updateWorkflowMonitor(WorkflowContext context, - WorkflowConfig config) { - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.updateWorkflowCounters(config, context.getWorkflowState(), - context.getFinishTime() - context.getStartTime()); - } - } - /** * Check if a workflow is ready to schedule. * @@ -294,41 +176,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return (startTime == null || startTime.getTime() <= System.currentTimeMillis()); } - /** - * Basic function to check task framework resources, workflow and job, are timeout - * @param startTime Resources start time - * @param timeoutPeriod Resources timeout period. Will be -1 if it is not set. - * @return - */ - protected boolean isTimeout(long startTime, long timeoutPeriod) { - long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); - return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout <= System - .currentTimeMillis(); - } - - /** - * Schedule the rebalancer timer for task framework elements - * @param resourceId The resource id - * @param startTime The resource start time - * @param timeoutPeriod The resource timeout period. Will be -1 if it is not set. - */ - protected void scheduleRebalanceForTimeout(String resourceId, long startTime, - long timeoutPeriod) { - long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); - long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId); - if (nextTimeout >= System.currentTimeMillis() && ( - nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT - || nextTimeout < nextRebalanceTime)) { - _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout); - } - } - - private long getTimeoutTime(long startTime, long timeoutPeriod) { - return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT - || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow - ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod; - } - @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, @@ -338,12 +185,4 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return currentIdealState; } - - - /** - * Set the ClusterStatusMonitor for metrics update - */ - public void setClusterStatusMonitor(ClusterStatusMonitor clusterStatusMonitor) { - _clusterStatusMonitor = clusterStatusMonitor; - } } http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index c6d7a55..b3a7f29 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -19,6 +19,7 @@ package org.apache.helix.task; * under the License. */ +import com.google.common.collect.Sets; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -488,10 +489,10 @@ public class TaskUtil { * @param pName * @return */ - protected static int getPartitionId(String pName) { + public static int getPartitionId(String pName) { int index = pName.lastIndexOf("_"); if (index == -1) { - throw new HelixException("Invalid partition name " + pName); + throw new HelixException(String.format("Invalid partition name %s", pName)); } return Integer.valueOf(pName.substring(index + 1)); } @@ -817,4 +818,62 @@ public class TaskUtil { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.resourceConfig(resource)); } + + public static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { + Set<Integer> nonReadyPartitions = Sets.newHashSet(); + for (int p : ctx.getPartitionSet()) { + long toStart = ctx.getNextRetryTime(p); + if (now < toStart) { + nonReadyPartitions.add(p); + } + } + return nonReadyPartitions; + } + + public static boolean isGenericTaskJob(JobConfig jobConfig) { + Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); + return taskConfigMap != null && !taskConfigMap.isEmpty(); + } + + /** + * Check whether tasks are just started or still running + * + * @param jobContext The job context + * + * @return False if still tasks not in final state. Otherwise return true + */ + public static boolean checkJobStopped(JobContext jobContext) { + for (int partition : jobContext.getPartitionSet()) { + TaskPartitionState taskState = jobContext.getPartitionState(partition); + if (taskState == TaskPartitionState.RUNNING) { + return false; + } + } + return true; + } + + + /** + * Count the number of jobs in a workflow that are not in final state. + * + * @param workflowCfg + * @param workflowCtx + * @return + */ + public static int getInCompleteJobCount(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { + int inCompleteCount = 0; + for (String jobName : workflowCfg.getJobDag().getAllNodes()) { + TaskState jobState = workflowCtx.getJobState(jobName); + if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { + ++inCompleteCount; + } + } + + return inCompleteCount; + } + + public static boolean isJobStarted(String job, WorkflowContext workflowContext) { + TaskState jobState = workflowContext.getJobState(job); + return (jobState != null && jobState != TaskState.NOT_STARTED); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/84c2feab/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 9233f9b..32eed2c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -196,7 +196,7 @@ public class WorkflowRebalancer extends TaskRebalancer { return; } - int inCompleteAllJobCount = getInCompleteJobCount(workflowCfg, workflowCtx); + int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx); int scheduledJobs = 0; long timeToSchedule = Long.MAX_VALUE; for (String job : workflowCfg.getJobDag().getAllNodes()) {
