Repository: helix Updated Branches: refs/heads/master b91d6eee4 -> 1b4e0bbb8
Create AbstractTaskDispatcher for future Task Framework Refactor existing code logic that move future needed logic to AbstractTaskDispatcher. RB=1308274 BUG=HELIX-985 G=helix-reviewers A=hrzhang Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1b4e0bbb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1b4e0bbb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1b4e0bbb Branch: refs/heads/master Commit: 1b4e0bbb89736ea7e0bd92c1af98e1ab102397f1 Parents: b91d6ee Author: Junkai Xue <[email protected]> Authored: Fri May 11 11:44:11 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Fri Sep 21 15:14:38 2018 -0700 ---------------------------------------------------------------------- .../helix/task/AbstractTaskDispatcher.java | 5 +++- .../org/apache/helix/task/JobRebalancer.java | 17 +++++++----- .../org/apache/helix/task/TaskRebalancer.java | 28 +++++++++++--------- .../java/org/apache/helix/task/TaskUtil.java | 4 +++ 4 files changed, 34 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index e230fb5..9a5b899 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -19,6 +19,7 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.assigner.AssignableInstance; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,7 @@ public abstract class AbstractTaskDispatcher { // Check for pending state transitions on this (partition, instance). Message pendingMessage = currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance); + if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) { processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance, pendingMessage, jobState, currState, paMap, assignedPartitions); @@ -818,6 +820,7 @@ public abstract class AbstractTaskDispatcher { incomplete = true; } } + if (!incomplete && cfg.isTerminable()) { ctx.setWorkflowState(TaskState.COMPLETED); return true; @@ -925,4 +928,4 @@ public abstract class AbstractTaskDispatcher { // This is a targeted task return pName(jobCfg.getJobId(), partitionNum); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 5b29c23..4dbf0a0 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -33,6 +33,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; @@ -55,8 +56,9 @@ public class JobRebalancer extends TaskRebalancer { private static final String PREV_RA_NODE = "PreviousResourceAssignment"; @Override - public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, - IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { + public ResourceAssignment computeBestPossiblePartitionState( + ClusterDataCache clusterData, IdealState taskIs, Resource resource, + CurrentStateOutput currStateOutput) { final String jobName = resource.getResourceName(); LOG.debug("Computer Best Partition for job: " + jobName); @@ -258,6 +260,7 @@ public class JobRebalancer extends TaskRebalancer { || (jobCfg.getTargetResource() != null && cache.getIdealState(jobCfg.getTargetResource()) != null && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) { + if (isJobFinished(jobCtx, jobResource, currStateOutput)) { failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache); return buildEmptyAssignment(jobResource, currStateOutput); @@ -340,8 +343,8 @@ public class JobRebalancer extends TaskRebalancer { Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition, instance); // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished - if (state == TaskPartitionState.RUNNING - || (state == TaskPartitionState.INIT && pendingMessage != null)) { + if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT + && pendingMessage != null)) { return false; } } @@ -351,6 +354,7 @@ public class JobRebalancer extends TaskRebalancer { /** * Get the last task assignment for a given job * @param resourceName the name of the job + * * @return {@link ResourceAssignment} instance, or null if no assignment is available */ private ResourceAssignment getPrevResourceAssignment(String resourceName) { @@ -395,8 +399,9 @@ public class JobRebalancer extends TaskRebalancer { /** * @param liveInstances - * @param prevAssignment task partition -> (instance -> state) + * @param prevAssignment task partition -> (instance -> state) * @param allTaskPartitions all task partitionIds + * * @return instance -> partitionIds from previous assignment, if the instance is still live */ private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments( @@ -438,4 +443,4 @@ public class JobRebalancer extends TaskRebalancer { } return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index dae0da6..2f88e24 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -47,26 +47,25 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher implements Rebalancer, MappingCalculator { private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class); - @Override - public void init(HelixManager manager) { + @Override public void init(HelixManager manager) { _manager = manager; } - @Override - public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput); /** * Checks if the workflow has been stopped. + * * @param ctx Workflow context containing task states * @param cfg Workflow config containing set of tasks + * * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise. */ protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) { for (String job : cfg.getJobDag().getAllNodes()) { TaskState jobState = ctx.getJobState(job); - if (jobState != null - && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) { + if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) { return false; } } @@ -90,9 +89,11 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher /** * Check all the dependencies of a job to determine whether the job is ready to be scheduled. + * * @param job * @param workflowCfg * @param workflowCtx + * * @return */ protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, @@ -116,8 +117,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher // If there is any parent job not started, this job should not be scheduled if (notStartedCount > 0) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job, - notStartedCount)); + LOG.debug(String + .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount)); } return false; } @@ -132,8 +133,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) { markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job, - failedOrTimeoutCount)); + LOG.debug(String + .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount)); } return false; } @@ -164,7 +165,9 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher /** * Check if a workflow is ready to schedule. + * * @param workflowCfg the workflow to check + * * @return true if the workflow is ready for schedule, false if not ready */ protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) { @@ -173,11 +176,10 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher return (startTime == null || startTime.getTime() <= System.currentTimeMillis()); } - @Override - public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { // All of the heavy lifting is in the ResourceAssignment computation, // so this part can just be a no-op. return currentIdealState; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index ded3aa2..1ce448c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -838,7 +838,9 @@ public class TaskUtil { /** * Check whether tasks are just started or still running + * * @param jobContext The job context + * * @return False if still tasks not in final state. Otherwise return true */ public static boolean checkJobStopped(JobContext jobContext) { @@ -851,8 +853,10 @@ public class TaskUtil { return true; } + /** * Count the number of jobs in a workflow that are not in final state. + * * @param workflowCfg * @param workflowCtx * @return
