Add timeout in JobConfig To support job-level timeout for the task framework, add the configuration field. Associated changed is made in builder and JobBean.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/177d5bdc Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/177d5bdc Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/177d5bdc Branch: refs/heads/master Commit: 177d5bdc29fc2011e12ca82d7bdf5456ef31a956 Parents: 408082a Author: Junkai Xue <[email protected]> Authored: Tue Oct 3 15:18:32 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Oct 3 15:18:32 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 30 ++++++++++++++++++-- .../java/org/apache/helix/task/TaskState.java | 7 ++++- .../org/apache/helix/task/beans/JobBean.java | 1 + 3 files changed, 34 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 12aa058..6c3aed4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -80,6 +80,10 @@ public class JobConfig extends ResourceConfig { */ JobCommandConfig, /** + * The allowed execution time of the job. + */ + Timeout, + /** * The timeout for a task. */ TimeoutPerPartition, @@ -151,6 +155,7 @@ public class JobConfig extends ResourceConfig { } //Default property values + public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE; public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; @@ -171,7 +176,7 @@ public class JobConfig extends ResourceConfig { public JobConfig(String jobId, JobConfig jobConfig) { this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(), jobConfig.getTargetPartitionStates(), jobConfig.getCommand(), - jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(), + jobConfig.getJobCommandConfigMap(), jobConfig.getTimeout(), jobConfig.getTimeoutPerTask(), jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(), jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(), jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(), @@ -183,7 +188,7 @@ public class JobConfig extends ResourceConfig { private JobConfig(String workflow, String targetResource, List<String> targetPartitions, Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap, - long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, + long timeout, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, boolean disableExternalView, boolean ignoreDependentJobFailure, Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag, @@ -221,6 +226,7 @@ public class JobConfig extends ResourceConfig { if (executionStart > 0) { getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart); } + getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout); getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask); getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask); getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(), @@ -289,6 +295,10 @@ public class JobConfig extends ResourceConfig { : null; } + public long getTimeout() { + return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT); + } + public long getTimeoutPerTask() { return getRecord() .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK); @@ -389,6 +399,7 @@ public class JobConfig extends ResourceConfig { private String _command; private Map<String, String> _commandConfig; private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap(); + private long _timeout = DEFAULT_TIMEOUT; private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK; private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; @@ -417,7 +428,7 @@ public class JobConfig extends ResourceConfig { validate(); return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, - _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, + _command, _commandConfig, _timeout, _timeoutPerTask, _numConcurrentTasksPerInstance, _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType, _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry, @@ -456,6 +467,9 @@ public class JobConfig extends ResourceConfig { cfg.get(JobConfigProperty.JobCommandConfig.name())); b.setJobCommandConfigMap(commandConfigMap); } + if (cfg.containsKey(JobConfigProperty.Timeout.name())) { + b.setTimeout(Long.parseLong(cfg.get(JobConfigProperty.Timeout.name()))); + } if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) { b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name()))); } @@ -544,6 +558,11 @@ public class JobConfig extends ResourceConfig { return this; } + public Builder setTimeout(long v) { + _timeout = v; + return this; + } + public Builder setTimeoutPerTask(long v) { _timeoutPerTask = v; return this; @@ -660,6 +679,10 @@ public class JobConfig extends ResourceConfig { } } } + if (_timeout < 0) { + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout)); + } if (_timeoutPerTask < 0) { throw new IllegalArgumentException(String .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition, @@ -696,6 +719,7 @@ public class JobConfig extends ResourceConfig { b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask) .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance) + .setTimeout(jobBean.timeout) .setTimeoutPerTask(jobBean.timeoutPerPartition) .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay) .setDisableExternalView(jobBean.disableExternalView) http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/TaskState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java index 4e12f2d..3713c40 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java @@ -50,5 +50,10 @@ public enum TaskState { /** * The task are aborted due to workflow fail */ - ABORTED + ABORTED, + /** + * The allowed execution time for the job. + * TODO: also use this for the task + */ + TIMED_OUT } http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index 7b42ad2..8d2f259 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -38,6 +38,7 @@ public class JobBean { public String command; public Map<String, String> jobCommandConfigMap; public List<TaskBean> tasks; + public long timeout = JobConfig.DEFAULT_TIMEOUT; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
