Repository: helix Updated Branches: refs/heads/master 516266de3 -> 35fcfa0ec
[HELIX-722] Add quotaType field to WorkflowConfig and JobConfig WorkflowConfig and JobConfig define workflows and jobs respectively. In order to support job scheduling based on quota types, we need to associate workflows and jobs with quota types and provide APIs for get/set accordingly. ChangeList: 1. Workflow and Job Config have APIs added for quota type support 2. Code formatting per Helix code formatter Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/35fcfa0e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/35fcfa0e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/35fcfa0e Branch: refs/heads/master Commit: 35fcfa0ec38e382cb7d4d981abdd4b5dcea11338 Parents: 516266d Author: Hunter Lee <[email protected]> Authored: Mon Jul 9 18:31:37 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Tue Jul 10 11:26:00 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 126 ++++++++++-------- .../org/apache/helix/task/WorkflowConfig.java | 132 +++++++++++-------- .../org/apache/helix/task/beans/JobBean.java | 1 + .../apache/helix/task/beans/WorkflowBean.java | 1 + 4 files changed, 151 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index f665dec..1892062 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -43,7 +43,8 @@ import com.google.common.collect.Maps; public class JobConfig extends ResourceConfig { /** - * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder. + * Do not use this value directly, always use the get/set methods in JobConfig and + * JobConfig.Builder. */ protected enum JobConfigProperty { /** @@ -68,7 +69,8 @@ public class JobConfig extends ResourceConfig { */ TargetPartitionStates, /** - * The set of the target partition ids. The value must be a comma-separated list of partition ids. + * The set of the target partition ids. The value must be a comma-separated list of partition + * ids. */ TargetPartitions, /** @@ -151,10 +153,15 @@ public class JobConfig extends ResourceConfig { /** * Whether or not enable running task rebalance */ - RebalanceRunningTask + RebalanceRunningTask, + + /** + * Quota type for this job used for quota-based scheduling + */ + QuotaType } - //Default property values + // Default property values public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; @@ -182,7 +189,7 @@ public class JobConfig extends ResourceConfig { jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(), jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), jobConfig.getExecutionDelay(), jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(), - jobConfig.isRebalanceRunningTask()); + jobConfig.isRebalanceRunningTask(), jobConfig.getQuotaType()); } private JobConfig(String workflow, String targetResource, List<String> targetPartitions, @@ -192,7 +199,7 @@ public class JobConfig extends ResourceConfig { boolean disableExternalView, boolean ignoreDependentJobFailure, Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag, long executionDelay, long executionStart, String jobId, long expiry, - boolean rebalanceRunningTask) { + boolean rebalanceRunningTask, String quotaType) { super(jobId); putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow); putSimpleConfig(JobConfigProperty.JobID.name(), jobId); @@ -256,6 +263,7 @@ public class JobConfig extends ResourceConfig { String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE)); getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(), rebalanceRunningTask); + putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType); } public String getWorkflow() { @@ -273,8 +281,9 @@ public class JobConfig extends ResourceConfig { } public List<String> getTargetPartitions() { - return simpleConfigContains(JobConfigProperty.TargetPartitions.name()) ? Arrays - .asList(getSimpleConfig(JobConfigProperty.TargetPartitions.name()).split(",")) : null; + return simpleConfigContains(JobConfigProperty.TargetPartitions.name()) + ? Arrays.asList(getSimpleConfig(JobConfigProperty.TargetPartitions.name()).split(",")) + : null; } public Set<String> getTargetPartitionStates() { @@ -290,19 +299,19 @@ public class JobConfig extends ResourceConfig { } public Map<String, String> getJobCommandConfigMap() { - return simpleConfigContains(JobConfigProperty.JobCommandConfig.name()) - ? TaskUtil + return simpleConfigContains(JobConfigProperty.JobCommandConfig.name()) ? TaskUtil .deserializeJobCommandConfigMap(getSimpleConfig(JobConfigProperty.JobCommandConfig.name())) : null; } public long getTimeout() { - return getRecord().getLongField(JobConfigProperty.Timeout.name(), TaskConstants.DEFAULT_NEVER_TIMEOUT); + return getRecord().getLongField(JobConfigProperty.Timeout.name(), + TaskConstants.DEFAULT_NEVER_TIMEOUT); } public long getTimeoutPerTask() { - return getRecord() - .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK); + return getRecord().getLongField(JobConfigProperty.TimeoutPerPartition.name(), + DEFAULT_TIMEOUT_PER_TASK); } public int getNumConcurrentTasksPerInstance() { @@ -311,29 +320,29 @@ public class JobConfig extends ResourceConfig { } public int getMaxAttemptsPerTask() { - return getRecord() - .getIntField(JobConfigProperty.MaxAttemptsPerTask.name(), DEFAULT_MAX_ATTEMPTS_PER_TASK); + return getRecord().getIntField(JobConfigProperty.MaxAttemptsPerTask.name(), + DEFAULT_MAX_ATTEMPTS_PER_TASK); } public int getFailureThreshold() { - return getRecord() - .getIntField(JobConfigProperty.FailureThreshold.name(), DEFAULT_FAILURE_THRESHOLD); + return getRecord().getIntField(JobConfigProperty.FailureThreshold.name(), + DEFAULT_FAILURE_THRESHOLD); } public long getTaskRetryDelay() { - return getRecord() - .getLongField(JobConfigProperty.TaskRetryDelay.name(), DEFAULT_TASK_RETRY_DELAY); + return getRecord().getLongField(JobConfigProperty.TaskRetryDelay.name(), + DEFAULT_TASK_RETRY_DELAY); } // Execution delay time will be ignored when it is negative number public long getExecutionDelay() { - return getRecord() - .getLongField(JobConfigProperty.DelayTime.name(), DEFAULT_Job_EXECUTION_DELAY_TIME); + return getRecord().getLongField(JobConfigProperty.DelayTime.name(), + DEFAULT_Job_EXECUTION_DELAY_TIME); } public long getExecutionStart() { - return getRecord() - .getLongField(JobConfigProperty.StartTime.name(), DEFAULT_JOB_EXECUTION_START_TIME); + return getRecord().getLongField(JobConfigProperty.StartTime.name(), + DEFAULT_JOB_EXECUTION_START_TIME); } public boolean isDisableExternalView() { @@ -349,8 +358,8 @@ public class JobConfig extends ResourceConfig { public Map<String, TaskConfig> getTaskConfigMap() { Map<String, TaskConfig> taskConfigMap = new HashMap<String, TaskConfig>(); for (Map.Entry<String, Map<String, String>> entry : getMapConfigs().entrySet()) { - taskConfigMap - .put(entry.getKey(), new TaskConfig(null, entry.getValue(), entry.getKey(), null)); + taskConfigMap.put(entry.getKey(), + new TaskConfig(null, entry.getValue(), entry.getKey(), null)); } return taskConfigMap; } @@ -380,6 +389,14 @@ public class JobConfig extends ResourceConfig { DEFAULT_REBALANCE_RUNNING_TASK); } + /** + * Returns the quota type for this job. + * @return quota type. null if quota type is not set + */ + public String getQuotaType() { + return getSimpleConfig(JobConfigProperty.QuotaType.name()); + } + public static JobConfig fromHelixProperty(HelixProperty property) throws IllegalArgumentException { Map<String, String> configs = property.getRecord().getSimpleFields(); @@ -414,6 +431,7 @@ public class JobConfig extends ResourceConfig { private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS; private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK; + private String _quotaType; public JobConfig build() { if (_targetResource == null && _taskConfigMap.isEmpty()) { @@ -433,12 +451,11 @@ public class JobConfig extends ResourceConfig { _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType, _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry, - _rebalanceRunningTask); + _rebalanceRunningTask, _quotaType); } /** * Convenience method to build a {@link JobConfig} from a {@code Map<String, String>}. - * * @param cfg A map of property names to their string representations. * @return A {@link Builder}. */ @@ -464,8 +481,8 @@ public class JobConfig extends ResourceConfig { b.setCommand(cfg.get(JobConfigProperty.Command.name())); } if (cfg.containsKey(JobConfigProperty.JobCommandConfig.name())) { - Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap( - cfg.get(JobConfigProperty.JobCommandConfig.name())); + Map<String, String> commandConfigMap = TaskUtil + .deserializeJobCommandConfigMap(cfg.get(JobConfigProperty.JobCommandConfig.name())); b.setJobCommandConfigMap(commandConfigMap); } if (cfg.containsKey(JobConfigProperty.Timeout.name())) { @@ -483,8 +500,7 @@ public class JobConfig extends ResourceConfig { Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name()))); } if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) { - b.setFailureThreshold( - Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name()))); + b.setFailureThreshold(Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name()))); } if (cfg.containsKey(JobConfigProperty.TaskRetryDelay.name())) { b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TaskRetryDelay.name()))); @@ -516,6 +532,9 @@ public class JobConfig extends ResourceConfig { b.setRebalanceRunningTask( Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name()))); } + if (cfg.containsKey(JobConfigProperty.QuotaType.name())) { + b.setQuotaType(cfg.get(JobConfigProperty.QuotaType.name())); + } return b; } @@ -650,13 +669,18 @@ public class JobConfig extends ResourceConfig { return this; } + public Builder setQuotaType(String quotaType) { + _quotaType = quotaType; + return this; + } + private void validate() { if (_taskConfigMap.isEmpty() && _targetResource == null) { throw new IllegalArgumentException( String.format("%s cannot be null", JobConfigProperty.TargetResource)); } - if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates - .isEmpty()) { + if (_taskConfigMap.isEmpty() && _targetPartitionStates != null + && _targetPartitionStates.isEmpty()) { throw new IllegalArgumentException( String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates)); } @@ -681,33 +705,28 @@ public class JobConfig extends ResourceConfig { } } if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout)); + throw new IllegalArgumentException( + String.format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout)); } if (_timeoutPerTask < 0) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition, - _timeoutPerTask)); + throw new IllegalArgumentException(String.format("%s has invalid value %s", + JobConfigProperty.TimeoutPerPartition, _timeoutPerTask)); } if (_numConcurrentTasksPerInstance < 1) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.ConcurrentTasksPerInstance, - _numConcurrentTasksPerInstance)); + throw new IllegalArgumentException(String.format("%s has invalid value %s", + JobConfigProperty.ConcurrentTasksPerInstance, _numConcurrentTasksPerInstance)); } if (_maxAttemptsPerTask < 1) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.MaxAttemptsPerTask, - _maxAttemptsPerTask)); + throw new IllegalArgumentException(String.format("%s has invalid value %s", + JobConfigProperty.MaxAttemptsPerTask, _maxAttemptsPerTask)); } if (_maxForcedReassignmentsPerTask < 0) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.MaxForcedReassignmentsPerTask, - _maxForcedReassignmentsPerTask)); + throw new IllegalArgumentException(String.format("%s has invalid value %s", + JobConfigProperty.MaxForcedReassignmentsPerTask, _maxForcedReassignmentsPerTask)); } if (_failureThreshold < 0) { - throw new IllegalArgumentException(String - .format("%s has invalid value %s", JobConfigProperty.FailureThreshold, - _failureThreshold)); + throw new IllegalArgumentException(String.format("%s has invalid value %s", + JobConfigProperty.FailureThreshold, _failureThreshold)); } if (_workflow == null) { throw new IllegalArgumentException( @@ -720,14 +739,13 @@ public class JobConfig extends ResourceConfig { b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask) .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance) - .setTimeout(jobBean.timeout) - .setTimeoutPerTask(jobBean.timeoutPerPartition) + .setTimeout(jobBean.timeout).setTimeoutPerTask(jobBean.timeoutPerPartition) .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay) .setDisableExternalView(jobBean.disableExternalView) .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure) .setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay) .setExecutionStart(jobBean.executionStart) - .setRebalanceRunningTask(jobBean.rebalanceRunningTask); + .setRebalanceRunningTask(jobBean.rebalanceRunningTask).setQuotaType(jobBean.quotaType); if (jobBean.jobCommandConfigMap != null) { b.setJobCommandConfigMap(jobBean.jobCommandConfigMap); @@ -765,4 +783,4 @@ public class JobConfig extends ResourceConfig { return Arrays.asList(vals); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index f9016d1..661615d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -37,13 +37,12 @@ import org.slf4j.LoggerFactory; /** * Provides a typed interface to workflow level configurations. Validates the configurations. */ -public class WorkflowConfig extends ResourceConfig { +public class WorkflowConfig extends ResourceConfig { private static final Logger LOG = LoggerFactory.getLogger(WorkflowConfig.class); /** * Do not use these values directly, always use the getters/setters * in WorkflowConfig and WorkflowConfig.Builder. - * * For back-compatible, this class will be left for public for a while, * but it will be change to protected in future major release. */ @@ -66,8 +65,11 @@ public class WorkflowConfig extends ResourceConfig { JobPurgeInterval, /* Allow multiple jobs in this workflow to be assigned to a same instance or not */ AllowOverlapJobAssignment, - Timeout - } + Timeout, + /* Quota related fields */ + QuotaType // Quota type for workflows is a syntactic sugar for setting + // all of the jobs to this quota type + } /* Default values */ public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000; @@ -80,10 +82,9 @@ public class WorkflowConfig extends ResourceConfig { public static final boolean DEFAULT_JOB_QUEUE = false; public static final boolean DEFAULT_MONITOR_DISABLE = true; public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false; - protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; //default 30 minutes + protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; // default 30 minutes private JobDag _jobDag; - public WorkflowConfig(HelixProperty property) { super(property.getRecord()); } @@ -92,7 +93,7 @@ public class WorkflowConfig extends ResourceConfig { this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(), cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(), cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.getJobPurgeInterval(), - cfg.isAllowOverlapJobAssignment(), cfg.getTimeout()); + cfg.isAllowOverlapJobAssignment(), cfg.getTimeout(), cfg.getQuotaType()); } /* Member variables */ @@ -102,7 +103,7 @@ public class WorkflowConfig extends ResourceConfig { TargetState targetState, long expiry, int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue, Map<String, String> jobTypes, long purgeInterval, boolean allowOverlapJobAssignment, - long timeout) { + long timeout, String quotaType) { super(workflowId); putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId); @@ -119,7 +120,8 @@ public class WorkflowConfig extends ResourceConfig { putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(), String.valueOf(failureThreshold)); putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), String.valueOf(purgeInterval)); - putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), String.valueOf(allowOverlapJobAssignment)); + putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), + String.valueOf(allowOverlapJobAssignment)); if (capacity > 0) { putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity)); @@ -152,6 +154,11 @@ public class WorkflowConfig extends ResourceConfig { } putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(), String.valueOf(DEFAULT_MONITOR_DISABLE)); + + // Set the quota type for this workflow + if (quotaType != null) { + putSimpleConfig(WorkflowConfigProperty.QuotaType.name(), quotaType); + } } public String getWorkflowId() { @@ -160,8 +167,9 @@ public class WorkflowConfig extends ResourceConfig { public JobDag getJobDag() { if (_jobDag == null) { - _jobDag = simpleConfigContains(WorkflowConfigProperty.Dag.name()) ? JobDag - .fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG; + _jobDag = simpleConfigContains(WorkflowConfigProperty.Dag.name()) + ? JobDag.fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) + : DEFAULT_JOB_DAG; } return _jobDag; } @@ -175,13 +183,13 @@ public class WorkflowConfig extends ResourceConfig { } public int getParallelJobs() { - return _record - .getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS); + return _record.getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS); } public TargetState getTargetState() { - return simpleConfigContains(WorkflowConfigProperty.TargetState.name()) ? TargetState - .valueOf(getSimpleConfig(WorkflowConfigProperty.TargetState.name())) : DEFAULT_TARGET_STATE; + return simpleConfigContains(WorkflowConfigProperty.TargetState.name()) + ? TargetState.valueOf(getSimpleConfig(WorkflowConfigProperty.TargetState.name())) + : DEFAULT_TARGET_STATE; } public long getExpiry() { @@ -189,8 +197,8 @@ public class WorkflowConfig extends ResourceConfig { } public long getJobPurgeInterval() { - return _record - .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), DEFAULT_JOB_PURGE_INTERVAL); + return _record.getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), + DEFAULT_JOB_PURGE_INTERVAL); } /** @@ -198,8 +206,8 @@ public class WorkflowConfig extends ResourceConfig { * @return */ public int getFailureThreshold() { - return _record - .getIntField(WorkflowConfigProperty.FailureThreshold.name(), DEFAULT_FAILURE_THRESHOLD); + return _record.getIntField(WorkflowConfigProperty.FailureThreshold.name(), + DEFAULT_FAILURE_THRESHOLD); } /** @@ -212,8 +220,9 @@ public class WorkflowConfig extends ResourceConfig { } public String getWorkflowType() { - return simpleConfigContains(WorkflowConfigProperty.WorkflowType.name()) ? getSimpleConfig( - WorkflowConfigProperty.WorkflowType.name()) : null; + return simpleConfigContains(WorkflowConfigProperty.WorkflowType.name()) + ? getSimpleConfig(WorkflowConfigProperty.WorkflowType.name()) + : null; } public boolean isTerminable() { @@ -225,9 +234,9 @@ public class WorkflowConfig extends ResourceConfig { } public boolean isRecurring() { - return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) && simpleConfigContains( - WorkflowConfigProperty.RecurrenceInterval.name()) && simpleConfigContains( - WorkflowConfigProperty.RecurrenceUnit.name()); + return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) + && simpleConfigContains(WorkflowConfigProperty.RecurrenceInterval.name()) + && simpleConfigContains(WorkflowConfigProperty.RecurrenceUnit.name()); } public boolean isJobQueue() { @@ -239,8 +248,9 @@ public class WorkflowConfig extends ResourceConfig { } public Map<String, String> getJobTypes() { - return mapConfigContains(WorkflowConfigProperty.JobTypes.name()) ? getMapConfig( - WorkflowConfigProperty.JobTypes.name()) : null; + return mapConfigContains(WorkflowConfigProperty.JobTypes.name()) + ? getMapConfig(WorkflowConfigProperty.JobTypes.name()) + : null; } public boolean isAllowOverlapJobAssignment() { @@ -249,13 +259,12 @@ public class WorkflowConfig extends ResourceConfig { } public long getTimeout() { - return _record - .getLongField(WorkflowConfigProperty.Timeout.name(), TaskConstants.DEFAULT_NEVER_TIMEOUT); + return _record.getLongField(WorkflowConfigProperty.Timeout.name(), + TaskConstants.DEFAULT_NEVER_TIMEOUT); } public static SimpleDateFormat getDefaultDateFormat() { - SimpleDateFormat defaultDateFormat = new SimpleDateFormat( - "MM-dd-yyyy HH:mm:ss"); + SimpleDateFormat defaultDateFormat = new SimpleDateFormat("MM-dd-yyyy HH:mm:ss"); defaultDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); return defaultDateFormat; @@ -263,15 +272,13 @@ public class WorkflowConfig extends ResourceConfig { /** * Get the scheduled start time of the workflow. - * * @return start time if the workflow schedule is set, null if no schedule config set. */ public Date getStartTime() { // Workflow with non-scheduled config is ready to schedule immediately. try { - return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) - ? WorkflowConfig.getDefaultDateFormat() - .parse(getSimpleConfig(WorkflowConfigProperty.StartTime.name())) + return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) ? WorkflowConfig + .getDefaultDateFormat().parse(getSimpleConfig(WorkflowConfigProperty.StartTime.name())) : null; } catch (ParseException e) { LOG.error("Unparseable date " + getSimpleConfig(WorkflowConfigProperty.StartTime.name()), e); @@ -285,7 +292,6 @@ public class WorkflowConfig extends ResourceConfig { /** * Get a ScheduleConfig from a workflow config string map - * * @param cfg the string map * @return a ScheduleConfig if one exists, otherwise null */ @@ -297,24 +303,29 @@ public class WorkflowConfig extends ResourceConfig { startTime = WorkflowConfig.getDefaultDateFormat() .parse(cfg.get(WorkflowConfigProperty.StartTime.name())); } catch (ParseException e) { - LOG.error( - "Unparseable date " + cfg.get(WorkflowConfigProperty.StartTime.name()), - e); + LOG.error("Unparseable date " + cfg.get(WorkflowConfigProperty.StartTime.name()), e); return null; } } - if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name()) && cfg - .containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) { + if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name()) + && cfg.containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) { return ScheduleConfig.recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfigProperty.RecurrenceUnit.name())), - Long.parseLong( - cfg.get(WorkflowConfigProperty.RecurrenceInterval.name()))); + Long.parseLong(cfg.get(WorkflowConfigProperty.RecurrenceInterval.name()))); } else if (startTime != null) { return ScheduleConfig.oneTimeDelayedStart(startTime); } return null; } + /** + * Returns the quota type set for this workflow. + * @return quotaType string, null if quota type is not set + */ + public String getQuotaType() { + return getSimpleConfig(WorkflowConfigProperty.QuotaType.name()); + } + public static WorkflowConfig fromHelixProperty(HelixProperty property) throws IllegalArgumentException { Map<String, String> configs = property.getRecord().getSimpleFields(); @@ -341,16 +352,18 @@ public class WorkflowConfig extends ResourceConfig { private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL; private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT; private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT; + private String _quotaType = null; public WorkflowConfig build() { validate(); return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue, - _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout); + _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout, _quotaType); } - public Builder() {} + public Builder() { + } public Builder(String workflowId) { _workflowId = workflowId; @@ -372,6 +385,7 @@ public class WorkflowConfig extends ResourceConfig { _jobPurgeInterval = workflowConfig.getJobPurgeInterval(); _allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment(); _timeout = workflowConfig.getTimeout(); + _quotaType = workflowConfig.getQuotaType(); } public Builder setWorkflowId(String v) { @@ -397,10 +411,8 @@ public class WorkflowConfig extends ResourceConfig { /** * The expiry time for this workflow. Helix may clean up the workflow information after the * expiry time from the completion of the workflow. - * * @param v * @param unit - * * @return */ public Builder setExpiry(long v, TimeUnit unit) { @@ -411,9 +423,7 @@ public class WorkflowConfig extends ResourceConfig { /** * The expiry time for this workflow. Helix may clean up the workflow information after the * expiry time from the completion of the workflow. - * * @param v in milliseconds - * * @return */ public Builder setExpiry(long v) { @@ -424,9 +434,7 @@ public class WorkflowConfig extends ResourceConfig { /** * The time periodical Helix should clean up all completed jobs. This config applies only on * JobQueue. - * * @param t in milliseconds - * * @return */ public Builder setJobPurgeInterval(long t) { @@ -436,9 +444,7 @@ public class WorkflowConfig extends ResourceConfig { /** * The max allowed numbers of failed jobs before Helix should marks the workflow failure. - * * @param failureThreshold - * * @return */ public Builder setFailureThreshold(int failureThreshold) { @@ -483,7 +489,7 @@ public class WorkflowConfig extends ResourceConfig { /** * ONLY generic workflow can be timeouted. JobQueue does not allow to be timeouted. - * @param timeout The timeout period + * @param timeout The timeout period * @return */ public Builder setTimeout(long timeout) { @@ -502,6 +508,17 @@ public class WorkflowConfig extends ResourceConfig { return this; } + /** + * Set the quota type for this workflow. If a workflow has a quota type set, + * all of its jobs will be of that quota type. + * @param quotaType + * @return + */ + public Builder setQuotaType(String quotaType) { + _quotaType = quotaType; + return this; + } + @Deprecated public static Builder fromMap(Map<String, String> cfg) { Builder builder = new Builder(); @@ -586,6 +603,10 @@ public class WorkflowConfig extends ResourceConfig { setTimeout(Long.parseLong(cfg.get(WorkflowConfigProperty.Timeout.name()))); } + if (cfg.containsKey(WorkflowConfigProperty.QuotaType.name())) { + setQuotaType(cfg.get(WorkflowConfigProperty.QuotaType.name())); + } + return this; } @@ -635,6 +656,7 @@ public class WorkflowConfig extends ResourceConfig { b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule)); } b.setExpiry(workflowBean.expiry); + b.setQuotaType(workflowBean.quotaType); return b; } @@ -643,8 +665,8 @@ public class WorkflowConfig extends ResourceConfig { _taskDag.validate(); if (_expiry < 0) { - throw new HelixException(String - .format("%s has invalid value %s", WorkflowConfigProperty.Expiry.name(), _expiry)); + throw new HelixException(String.format("%s has invalid value %s", + WorkflowConfigProperty.Expiry.name(), _expiry)); } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) { throw new HelixException( "Scheduler configuration is invalid. The configuration must have a start time if it is " http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index 5674d92..e0f1a70 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -51,4 +51,5 @@ public class JobBean { public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS; public boolean rebalanceRunningTask = JobConfig.DEFAULT_REBALANCE_RUNNING_TASK; + public String quotaType; } http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java index 2a9563e..ae01f5d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java @@ -32,4 +32,5 @@ public class WorkflowBean { public ScheduleBean schedule; public long expiry = WorkflowConfig.DEFAULT_EXPIRY; public String workflowType; + public String quotaType; // Syntactic sugar for setting all of workflow's jobs to this quota type }
