Repository: helix Updated Branches: refs/heads/master b3ee27d4a -> e1176fe40
Fix Job level timeout not timeout jobs and refactor logics There is an issue that job does not get timeouted. The rebalancerSchduler is not got scheduled when job started. Fixed this issue. Refactor the logics to support workflow level timeout. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d742d098 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d742d098 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d742d098 Branch: refs/heads/master Commit: d742d09846c8b39cd46c9d8560ca856355530a88 Parents: b3ee27d Author: Junkai Xue <[email protected]> Authored: Thu Feb 8 15:22:18 2018 -0800 Committer: Junkai Xue <[email protected]> Committed: Tue Mar 20 11:55:27 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 9 +++-- .../org/apache/helix/task/JobRebalancer.java | 34 +++--------------- .../org/apache/helix/task/TaskConstants.java | 2 ++ .../org/apache/helix/task/TaskRebalancer.java | 36 ++++++++++++++++++++ .../org/apache/helix/task/beans/JobBean.java | 3 +- .../helix/integration/task/TestJobTimeout.java | 3 +- .../task/TestJobTimeoutTaskNotStarted.java | 3 +- 7 files changed, 51 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index fe2b244..f665dec 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -155,7 +155,6 @@ public class JobConfig extends ResourceConfig { } //Default property values - public static final long DEFAULT_TIMEOUT_NEVER = -1; // never timeout public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; @@ -226,7 +225,7 @@ public class JobConfig extends ResourceConfig { if (executionStart > 0) { getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart); } - if (timeout > DEFAULT_TIMEOUT_NEVER) { + if (timeout > TaskConstants.DEFAULT_NEVER_TIMEOUT) { getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout); } getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask); @@ -298,7 +297,7 @@ public class JobConfig extends ResourceConfig { } public long getTimeout() { - return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT_NEVER); + return getRecord().getLongField(JobConfigProperty.Timeout.name(), TaskConstants.DEFAULT_NEVER_TIMEOUT); } public long getTimeoutPerTask() { @@ -401,7 +400,7 @@ public class JobConfig extends ResourceConfig { private String _command; private Map<String, String> _commandConfig; private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap(); - private long _timeout = DEFAULT_TIMEOUT_NEVER; + private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT; private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK; private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; @@ -681,7 +680,7 @@ public class JobConfig extends ResourceConfig { } } } - if (_timeout < DEFAULT_TIMEOUT_NEVER) { + if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) { throw new IllegalArgumentException(String .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout)); } http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 51da264..a855f7d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -132,7 +132,9 @@ public class JobRebalancer extends TaskRebalancer { workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS); } - scheduleRebalanceForJobTimeout(jobCfg, jobCtx); + if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) { + scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout()); + } // Grab the old assignment, or an empty one if it doesn't exist ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName); @@ -212,7 +214,8 @@ public class JobRebalancer extends TaskRebalancer { TargetState jobTgtState = workflowConfig.getTargetState(); TaskState jobState = workflowCtx.getJobState(jobResource); - if (jobState == TaskState.IN_PROGRESS && isJobTimeout(jobCtx, jobCfg)) { + if (jobState == TaskState.IN_PROGRESS && isTimeout(jobCtx.getStartTime(), + jobCfg.getTimeout())) { jobState = TaskState.TIMING_OUT; workflowCtx.setJobState(jobResource, TaskState.TIMING_OUT); } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) { @@ -626,12 +629,6 @@ public class JobRebalancer extends TaskRebalancer { TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName); } - private boolean isJobTimeout(JobContext jobContext, JobConfig jobConfig) { - long jobTimeoutTime = computeJobTimeoutTime(jobContext, jobConfig); - return jobTimeoutTime != jobConfig.DEFAULT_TIMEOUT_NEVER && jobTimeoutTime <= System - .currentTimeMillis(); - } - private boolean isJobFinished(JobContext jobContext, String jobResource, CurrentStateOutput currentStateOutput) { for (int pId : jobContext.getPartitionSet()) { @@ -648,15 +645,6 @@ public class JobRebalancer extends TaskRebalancer { return true; } - // Return jobConfig.DEFAULT_TIMEOUT_NEVER if job should never timeout. - // job start time can't be -1 before calling this method. - private long computeJobTimeoutTime(JobContext jobContext, JobConfig jobConfig) { - return (jobConfig.getTimeout() == JobConfig.DEFAULT_TIMEOUT_NEVER - || jobConfig.getTimeout() > Long.MAX_VALUE - jobContext.getStartTime()) // check long overflow - ? jobConfig.DEFAULT_TIMEOUT_NEVER - : jobContext.getStartTime() + jobConfig.getTimeout(); - } - private void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) { long currentTime = System.currentTimeMillis(); @@ -694,18 +682,6 @@ public class JobRebalancer extends TaskRebalancer { } } - // Set job timeout rebalance, if the time is earlier than the current scheduled rebalance time - // This needs to run for every rebalance because the scheduled rebalance could be removed in other places. - private void scheduleRebalanceForJobTimeout(JobConfig jobCfg, JobContext jobCtx) { - long jobTimeoutTime = computeJobTimeoutTime(jobCtx, jobCfg); - if (jobTimeoutTime != JobConfig.DEFAULT_TIMEOUT_NEVER && jobTimeoutTime > System.currentTimeMillis()) { - long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(jobCfg.getJobId()); - if (nextRebalanceTime == JobConfig.DEFAULT_TIMEOUT_NEVER || jobTimeoutTime < nextRebalanceTime) { - _rebalanceScheduler.scheduleRebalance(_manager, jobCfg.getJobId(), jobTimeoutTime); - } - } - } - /** * Get the last task assignment for a given job * http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java index 4752602..eee57d3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java @@ -44,4 +44,6 @@ public class TaskConstants { */ public static final String CONTEXT_NODE = "Context"; + public static final long DEFAULT_NEVER_TIMEOUT = -1; // never timeout + } http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 19a4049..a5e5271 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -270,6 +270,40 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return (startTime == null || startTime.getTime() <= System.currentTimeMillis()); } + /** + * Basic function to check task framework resources, workflow and job, are timeout + * @param startTime Resources start time + * @param timeoutPeriod Resources timeout period. Will be -1 if it is not set. + * @return + */ + protected boolean isTimeout(long startTime, long timeoutPeriod) { + long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); + return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout <= System + .currentTimeMillis(); + } + + /** + * Schedule the rebalancer timer for task framework elements + * @param resourceId The resource id + * @param startTime The resource start time + * @param timeoutPeriod The resource timeout period. Will be -1 if it is not set. + */ + protected void scheduleRebalanceForTimeout(String resourceId, long startTime, + long timeoutPeriod) { + long nextTimeout = getTimeoutTime(startTime, timeoutPeriod); + long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId); + if (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT + || nextTimeout < nextRebalanceTime) { + _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout); + } + } + + private long getTimeoutTime(long startTime, long timeoutPeriod) { + return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT + || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow + ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod; + } + @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, @@ -279,6 +313,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return currentIdealState; } + + /** * Set the ClusterStatusMonitor for metrics update */ http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index 913b989..5674d92 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConstants; /** * Bean class used for parsing job definitions from YAML. @@ -38,7 +39,7 @@ public class JobBean { public String command; public Map<String, String> jobCommandConfigMap; public List<TaskBean> tasks; - public long timeout = JobConfig.DEFAULT_TIMEOUT_NEVER; + public long timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java index e6f2116..56fc04a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java @@ -45,12 +45,11 @@ public final class TestJobTimeout extends TaskSynchronizedTestBase { @BeforeClass public void beforeClass() throws Exception { - _participants = new MockParticipantManager[_numNodes]; _numNodes = 2; _numParitions = 2; _numReplicas = 1; // only Master, no Slave _numDbs = 1; - + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursively(namespace); http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java index 9521e4c..6129946 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java @@ -56,12 +56,11 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase { @BeforeClass public void beforeClass() throws Exception { - _participants = new MockParticipantManager[_numNodes]; _numDbs = 1; _numNodes = 1; _numParitions = 50; _numReplicas = 1; - + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursively(namespace);
