Repository: helix
Updated Branches:
  refs/heads/master 516266de3 -> 35fcfa0ec


[HELIX-722] Add quotaType field to WorkflowConfig and JobConfig

WorkflowConfig and JobConfig define workflows and jobs respectively. In order 
to support job scheduling based on quota types, we need to associate workflows 
and jobs with quota types and provide APIs for get/set accordingly.

ChangeList:
1. Workflow and Job Config have APIs added for quota type support
2. Code formatting per Helix code formatter


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/35fcfa0e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/35fcfa0e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/35fcfa0e

Branch: refs/heads/master
Commit: 35fcfa0ec38e382cb7d4d981abdd4b5dcea11338
Parents: 516266d
Author: Hunter Lee <[email protected]>
Authored: Mon Jul 9 18:31:37 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Tue Jul 10 11:26:00 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 126 ++++++++++--------
 .../org/apache/helix/task/WorkflowConfig.java   | 132 +++++++++++--------
 .../org/apache/helix/task/beans/JobBean.java    |   1 +
 .../apache/helix/task/beans/WorkflowBean.java   |   1 +
 4 files changed, 151 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java 
b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index f665dec..1892062 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -43,7 +43,8 @@ import com.google.common.collect.Maps;
 public class JobConfig extends ResourceConfig {
 
   /**
-   * Do not use this value directly, always use the get/set methods in 
JobConfig and JobConfig.Builder.
+   * Do not use this value directly, always use the get/set methods in 
JobConfig and
+   * JobConfig.Builder.
    */
   protected enum JobConfigProperty {
     /**
@@ -68,7 +69,8 @@ public class JobConfig extends ResourceConfig {
      */
     TargetPartitionStates,
     /**
-     * The set of the target partition ids. The value must be a 
comma-separated list of partition ids.
+     * The set of the target partition ids. The value must be a 
comma-separated list of partition
+     * ids.
      */
     TargetPartitions,
     /**
@@ -151,10 +153,15 @@ public class JobConfig extends ResourceConfig {
     /**
      * Whether or not enable running task rebalance
      */
-    RebalanceRunningTask
+    RebalanceRunningTask,
+
+    /**
+     * Quota type for this job used for quota-based scheduling
+     */
+    QuotaType
   }
 
-  //Default property values
+  // Default property values
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -182,7 +189,7 @@ public class JobConfig extends ResourceConfig {
         jobConfig.isIgnoreDependentJobFailure(), jobConfig.getTaskConfigMap(),
         jobConfig.getJobType(), jobConfig.getInstanceGroupTag(), 
jobConfig.getExecutionDelay(),
         jobConfig.getExecutionStart(), jobId, jobConfig.getExpiry(),
-        jobConfig.isRebalanceRunningTask());
+        jobConfig.isRebalanceRunningTask(), jobConfig.getQuotaType());
   }
 
   private JobConfig(String workflow, String targetResource, List<String> 
targetPartitions,
@@ -192,7 +199,7 @@ public class JobConfig extends ResourceConfig {
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String 
instanceGroupTag,
       long executionDelay, long executionStart, String jobId, long expiry,
-      boolean rebalanceRunningTask) {
+      boolean rebalanceRunningTask, String quotaType) {
     super(jobId);
     putSimpleConfig(JobConfigProperty.WorkflowID.name(), workflow);
     putSimpleConfig(JobConfigProperty.JobID.name(), jobId);
@@ -256,6 +263,7 @@ public class JobConfig extends ResourceConfig {
         String.valueOf(WorkflowConfig.DEFAULT_MONITOR_DISABLE));
     getRecord().setBooleanField(JobConfigProperty.RebalanceRunningTask.name(),
         rebalanceRunningTask);
+    putSimpleConfig(JobConfigProperty.QuotaType.name(), quotaType);
   }
 
   public String getWorkflow() {
@@ -273,8 +281,9 @@ public class JobConfig extends ResourceConfig {
   }
 
   public List<String> getTargetPartitions() {
-    return simpleConfigContains(JobConfigProperty.TargetPartitions.name()) ? 
Arrays
-        
.asList(getSimpleConfig(JobConfigProperty.TargetPartitions.name()).split(",")) 
: null;
+    return simpleConfigContains(JobConfigProperty.TargetPartitions.name())
+        ? 
Arrays.asList(getSimpleConfig(JobConfigProperty.TargetPartitions.name()).split(","))
+        : null;
   }
 
   public Set<String> getTargetPartitionStates() {
@@ -290,19 +299,19 @@ public class JobConfig extends ResourceConfig {
   }
 
   public Map<String, String> getJobCommandConfigMap() {
-    return simpleConfigContains(JobConfigProperty.JobCommandConfig.name())
-        ? TaskUtil
+    return simpleConfigContains(JobConfigProperty.JobCommandConfig.name()) ? 
TaskUtil
         
.deserializeJobCommandConfigMap(getSimpleConfig(JobConfigProperty.JobCommandConfig.name()))
         : null;
   }
 
   public long getTimeout() {
-    return getRecord().getLongField(JobConfigProperty.Timeout.name(), 
TaskConstants.DEFAULT_NEVER_TIMEOUT);
+    return getRecord().getLongField(JobConfigProperty.Timeout.name(),
+        TaskConstants.DEFAULT_NEVER_TIMEOUT);
   }
 
   public long getTimeoutPerTask() {
-    return getRecord()
-        .getLongField(JobConfigProperty.TimeoutPerPartition.name(), 
DEFAULT_TIMEOUT_PER_TASK);
+    return 
getRecord().getLongField(JobConfigProperty.TimeoutPerPartition.name(),
+        DEFAULT_TIMEOUT_PER_TASK);
   }
 
   public int getNumConcurrentTasksPerInstance() {
@@ -311,29 +320,29 @@ public class JobConfig extends ResourceConfig {
   }
 
   public int getMaxAttemptsPerTask() {
-    return getRecord()
-        .getIntField(JobConfigProperty.MaxAttemptsPerTask.name(), 
DEFAULT_MAX_ATTEMPTS_PER_TASK);
+    return getRecord().getIntField(JobConfigProperty.MaxAttemptsPerTask.name(),
+        DEFAULT_MAX_ATTEMPTS_PER_TASK);
   }
 
   public int getFailureThreshold() {
-    return getRecord()
-        .getIntField(JobConfigProperty.FailureThreshold.name(), 
DEFAULT_FAILURE_THRESHOLD);
+    return getRecord().getIntField(JobConfigProperty.FailureThreshold.name(),
+        DEFAULT_FAILURE_THRESHOLD);
   }
 
   public long getTaskRetryDelay() {
-    return getRecord()
-        .getLongField(JobConfigProperty.TaskRetryDelay.name(), 
DEFAULT_TASK_RETRY_DELAY);
+    return getRecord().getLongField(JobConfigProperty.TaskRetryDelay.name(),
+        DEFAULT_TASK_RETRY_DELAY);
   }
 
   // Execution delay time will be ignored when it is negative number
   public long getExecutionDelay() {
-    return getRecord()
-        .getLongField(JobConfigProperty.DelayTime.name(), 
DEFAULT_Job_EXECUTION_DELAY_TIME);
+    return getRecord().getLongField(JobConfigProperty.DelayTime.name(),
+        DEFAULT_Job_EXECUTION_DELAY_TIME);
   }
 
   public long getExecutionStart() {
-    return getRecord()
-        .getLongField(JobConfigProperty.StartTime.name(), 
DEFAULT_JOB_EXECUTION_START_TIME);
+    return getRecord().getLongField(JobConfigProperty.StartTime.name(),
+        DEFAULT_JOB_EXECUTION_START_TIME);
   }
 
   public boolean isDisableExternalView() {
@@ -349,8 +358,8 @@ public class JobConfig extends ResourceConfig {
   public Map<String, TaskConfig> getTaskConfigMap() {
     Map<String, TaskConfig> taskConfigMap = new HashMap<String, TaskConfig>();
     for (Map.Entry<String, Map<String, String>> entry : 
getMapConfigs().entrySet()) {
-      taskConfigMap
-          .put(entry.getKey(), new TaskConfig(null, entry.getValue(), 
entry.getKey(), null));
+      taskConfigMap.put(entry.getKey(),
+          new TaskConfig(null, entry.getValue(), entry.getKey(), null));
     }
     return taskConfigMap;
   }
@@ -380,6 +389,14 @@ public class JobConfig extends ResourceConfig {
         DEFAULT_REBALANCE_RUNNING_TASK);
   }
 
+  /**
+   * Returns the quota type for this job.
+   * @return quota type. null if quota type is not set
+   */
+  public String getQuotaType() {
+    return getSimpleConfig(JobConfigProperty.QuotaType.name());
+  }
+
   public static JobConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -414,6 +431,7 @@ public class JobConfig extends ResourceConfig {
     private boolean _ignoreDependentJobFailure = 
DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
     private int _numberOfTasks = DEFAULT_NUMBER_OF_TASKS;
     private boolean _rebalanceRunningTask = DEFAULT_REBALANCE_RUNNING_TASK;
+    private String _quotaType;
 
     public JobConfig build() {
       if (_targetResource == null && _taskConfigMap.isEmpty()) {
@@ -433,12 +451,11 @@ public class JobConfig extends ResourceConfig {
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, 
_failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, 
_jobType,
           _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
-          _rebalanceRunningTask);
+          _rebalanceRunningTask, _quotaType);
     }
 
     /**
      * Convenience method to build a {@link JobConfig} from a {@code 
Map&lt;String, String&gt;}.
-     *
      * @param cfg A map of property names to their string representations.
      * @return A {@link Builder}.
      */
@@ -464,8 +481,8 @@ public class JobConfig extends ResourceConfig {
         b.setCommand(cfg.get(JobConfigProperty.Command.name()));
       }
       if (cfg.containsKey(JobConfigProperty.JobCommandConfig.name())) {
-        Map<String, String> commandConfigMap = 
TaskUtil.deserializeJobCommandConfigMap(
-            cfg.get(JobConfigProperty.JobCommandConfig.name()));
+        Map<String, String> commandConfigMap = TaskUtil
+            
.deserializeJobCommandConfigMap(cfg.get(JobConfigProperty.JobCommandConfig.name()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
       if (cfg.containsKey(JobConfigProperty.Timeout.name())) {
@@ -483,8 +500,7 @@ public class JobConfig extends ResourceConfig {
             
Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name())));
       }
       if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) {
-        b.setFailureThreshold(
-            
Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
+        
b.setFailureThreshold(Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
       }
       if (cfg.containsKey(JobConfigProperty.TaskRetryDelay.name())) {
         
b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TaskRetryDelay.name())));
@@ -516,6 +532,9 @@ public class JobConfig extends ResourceConfig {
         b.setRebalanceRunningTask(
             
Boolean.valueOf(cfg.get(JobConfigProperty.RebalanceRunningTask.name())));
       }
+      if (cfg.containsKey(JobConfigProperty.QuotaType.name())) {
+        b.setQuotaType(cfg.get(JobConfigProperty.QuotaType.name()));
+      }
       return b;
     }
 
@@ -650,13 +669,18 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setQuotaType(String quotaType) {
+      _quotaType = quotaType;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
             String.format("%s cannot be null", 
JobConfigProperty.TargetResource));
       }
-      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && 
_targetPartitionStates
-          .isEmpty()) {
+      if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
+          && _targetPartitionStates.isEmpty()) {
         throw new IllegalArgumentException(
             String.format("%s cannot be an empty set", 
JobConfigProperty.TargetPartitionStates));
       }
@@ -681,33 +705,28 @@ public class JobConfig extends ResourceConfig {
         }
       }
       if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", JobConfigProperty.Timeout, 
_timeout));
+        throw new IllegalArgumentException(
+            String.format("%s has invalid value %s", 
JobConfigProperty.Timeout, _timeout));
       }
       if (_timeoutPerTask < 0) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", 
JobConfigProperty.TimeoutPerPartition,
-                _timeoutPerTask));
+        throw new IllegalArgumentException(String.format("%s has invalid value 
%s",
+            JobConfigProperty.TimeoutPerPartition, _timeoutPerTask));
       }
       if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", 
JobConfigProperty.ConcurrentTasksPerInstance,
-                _numConcurrentTasksPerInstance));
+        throw new IllegalArgumentException(String.format("%s has invalid value 
%s",
+            JobConfigProperty.ConcurrentTasksPerInstance, 
_numConcurrentTasksPerInstance));
       }
       if (_maxAttemptsPerTask < 1) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", 
JobConfigProperty.MaxAttemptsPerTask,
-                _maxAttemptsPerTask));
+        throw new IllegalArgumentException(String.format("%s has invalid value 
%s",
+            JobConfigProperty.MaxAttemptsPerTask, _maxAttemptsPerTask));
       }
       if (_maxForcedReassignmentsPerTask < 0) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", 
JobConfigProperty.MaxForcedReassignmentsPerTask,
-                _maxForcedReassignmentsPerTask));
+        throw new IllegalArgumentException(String.format("%s has invalid value 
%s",
+            JobConfigProperty.MaxForcedReassignmentsPerTask, 
_maxForcedReassignmentsPerTask));
       }
       if (_failureThreshold < 0) {
-        throw new IllegalArgumentException(String
-            .format("%s has invalid value %s", 
JobConfigProperty.FailureThreshold,
-                _failureThreshold));
+        throw new IllegalArgumentException(String.format("%s has invalid value 
%s",
+            JobConfigProperty.FailureThreshold, _failureThreshold));
       }
       if (_workflow == null) {
         throw new IllegalArgumentException(
@@ -720,14 +739,13 @@ public class JobConfig extends ResourceConfig {
 
       b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
           
.setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
-          .setTimeout(jobBean.timeout)
-          .setTimeoutPerTask(jobBean.timeoutPerPartition)
+          
.setTimeout(jobBean.timeout).setTimeoutPerTask(jobBean.timeoutPerPartition)
           
.setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
           .setDisableExternalView(jobBean.disableExternalView)
           .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure)
           
.setNumberOfTasks(jobBean.numberOfTasks).setExecutionDelay(jobBean.executionDelay)
           .setExecutionStart(jobBean.executionStart)
-          .setRebalanceRunningTask(jobBean.rebalanceRunningTask);
+          
.setRebalanceRunningTask(jobBean.rebalanceRunningTask).setQuotaType(jobBean.quotaType);
 
       if (jobBean.jobCommandConfigMap != null) {
         b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
@@ -765,4 +783,4 @@ public class JobConfig extends ResourceConfig {
       return Arrays.asList(vals);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index f9016d1..661615d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -37,13 +37,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Provides a typed interface to workflow level configurations. Validates the 
configurations.
  */
-public class  WorkflowConfig extends ResourceConfig {
+public class WorkflowConfig extends ResourceConfig {
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowConfig.class);
 
   /**
    * Do not use these values directly, always use the getters/setters
    * in WorkflowConfig and WorkflowConfig.Builder.
-   *
    * For back-compatible, this class will be left for public for a while,
    * but it will be change to protected in future major release.
    */
@@ -66,8 +65,11 @@ public class  WorkflowConfig extends ResourceConfig {
     JobPurgeInterval,
     /* Allow multiple jobs in this workflow to be assigned to a same instance 
or not */
     AllowOverlapJobAssignment,
-    Timeout
-    }
+    Timeout,
+    /* Quota related fields */
+    QuotaType // Quota type for workflows is a syntactic sugar for setting
+    // all of the jobs to this quota type
+  }
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
@@ -80,10 +82,9 @@ public class  WorkflowConfig extends ResourceConfig {
   public static final boolean DEFAULT_JOB_QUEUE = false;
   public static final boolean DEFAULT_MONITOR_DISABLE = true;
   public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false;
-  protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; 
//default 30 minutes
+  protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; // 
default 30 minutes
   private JobDag _jobDag;
 
-
   public WorkflowConfig(HelixProperty property) {
     super(property.getRecord());
   }
@@ -92,7 +93,7 @@ public class  WorkflowConfig extends ResourceConfig {
     this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), 
cfg.getTargetState(), cfg.getExpiry(),
         cfg.getFailureThreshold(), cfg.isTerminable(), 
cfg.getScheduleConfig(), cfg.getCapacity(),
         cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), 
cfg.getJobPurgeInterval(),
-        cfg.isAllowOverlapJobAssignment(), cfg.getTimeout());
+        cfg.isAllowOverlapJobAssignment(), cfg.getTimeout(), 
cfg.getQuotaType());
   }
 
   /* Member variables */
@@ -102,7 +103,7 @@ public class  WorkflowConfig extends ResourceConfig {
       TargetState targetState, long expiry, int failureThreshold, boolean 
terminable,
       ScheduleConfig scheduleConfig, int capacity, String workflowType, 
boolean isJobQueue,
       Map<String, String> jobTypes, long purgeInterval, boolean 
allowOverlapJobAssignment,
-      long timeout) {
+      long timeout, String quotaType) {
     super(workflowId);
 
     putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -119,7 +120,8 @@ public class  WorkflowConfig extends ResourceConfig {
     putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(),
         String.valueOf(failureThreshold));
     putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), 
String.valueOf(purgeInterval));
-    putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), 
String.valueOf(allowOverlapJobAssignment));
+    putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(),
+        String.valueOf(allowOverlapJobAssignment));
 
     if (capacity > 0) {
       putSimpleConfig(WorkflowConfigProperty.capacity.name(), 
String.valueOf(capacity));
@@ -152,6 +154,11 @@ public class  WorkflowConfig extends ResourceConfig {
     }
     putSimpleConfig(ResourceConfigProperty.MONITORING_DISABLED.toString(),
         String.valueOf(DEFAULT_MONITOR_DISABLE));
+
+    // Set the quota type for this workflow
+    if (quotaType != null) {
+      putSimpleConfig(WorkflowConfigProperty.QuotaType.name(), quotaType);
+    }
   }
 
   public String getWorkflowId() {
@@ -160,8 +167,9 @@ public class  WorkflowConfig extends ResourceConfig {
 
   public JobDag getJobDag() {
     if (_jobDag == null) {
-      _jobDag = simpleConfigContains(WorkflowConfigProperty.Dag.name()) ? 
JobDag
-          .fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : 
DEFAULT_JOB_DAG;
+      _jobDag = simpleConfigContains(WorkflowConfigProperty.Dag.name())
+          ? JobDag.fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name()))
+          : DEFAULT_JOB_DAG;
     }
     return _jobDag;
   }
@@ -175,13 +183,13 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public int getParallelJobs() {
-    return _record
-        .getIntField(WorkflowConfigProperty.ParallelJobs.name(), 
DEFAULT_PARALLEL_JOBS);
+    return _record.getIntField(WorkflowConfigProperty.ParallelJobs.name(), 
DEFAULT_PARALLEL_JOBS);
   }
 
   public TargetState getTargetState() {
-    return simpleConfigContains(WorkflowConfigProperty.TargetState.name()) ? 
TargetState
-        .valueOf(getSimpleConfig(WorkflowConfigProperty.TargetState.name())) : 
DEFAULT_TARGET_STATE;
+    return simpleConfigContains(WorkflowConfigProperty.TargetState.name())
+        ? 
TargetState.valueOf(getSimpleConfig(WorkflowConfigProperty.TargetState.name()))
+        : DEFAULT_TARGET_STATE;
   }
 
   public long getExpiry() {
@@ -189,8 +197,8 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public long getJobPurgeInterval() {
-    return _record
-        .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), 
DEFAULT_JOB_PURGE_INTERVAL);
+    return _record.getLongField(WorkflowConfigProperty.JobPurgeInterval.name(),
+        DEFAULT_JOB_PURGE_INTERVAL);
   }
 
   /**
@@ -198,8 +206,8 @@ public class  WorkflowConfig extends ResourceConfig {
    * @return
    */
   public int getFailureThreshold() {
-    return _record
-        .getIntField(WorkflowConfigProperty.FailureThreshold.name(), 
DEFAULT_FAILURE_THRESHOLD);
+    return _record.getIntField(WorkflowConfigProperty.FailureThreshold.name(),
+        DEFAULT_FAILURE_THRESHOLD);
   }
 
   /**
@@ -212,8 +220,9 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public String getWorkflowType() {
-    return simpleConfigContains(WorkflowConfigProperty.WorkflowType.name()) ? 
getSimpleConfig(
-        WorkflowConfigProperty.WorkflowType.name()) : null;
+    return simpleConfigContains(WorkflowConfigProperty.WorkflowType.name())
+        ? getSimpleConfig(WorkflowConfigProperty.WorkflowType.name())
+        : null;
   }
 
   public boolean isTerminable() {
@@ -225,9 +234,9 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public boolean isRecurring() {
-    return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) && 
simpleConfigContains(
-        WorkflowConfigProperty.RecurrenceInterval.name()) && 
simpleConfigContains(
-        WorkflowConfigProperty.RecurrenceUnit.name());
+    return simpleConfigContains(WorkflowConfigProperty.StartTime.name())
+        && 
simpleConfigContains(WorkflowConfigProperty.RecurrenceInterval.name())
+        && simpleConfigContains(WorkflowConfigProperty.RecurrenceUnit.name());
   }
 
   public boolean isJobQueue() {
@@ -239,8 +248,9 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public Map<String, String> getJobTypes() {
-    return mapConfigContains(WorkflowConfigProperty.JobTypes.name()) ? 
getMapConfig(
-        WorkflowConfigProperty.JobTypes.name()) : null;
+    return mapConfigContains(WorkflowConfigProperty.JobTypes.name())
+        ? getMapConfig(WorkflowConfigProperty.JobTypes.name())
+        : null;
   }
 
   public boolean isAllowOverlapJobAssignment() {
@@ -249,13 +259,12 @@ public class  WorkflowConfig extends ResourceConfig {
   }
 
   public long getTimeout() {
-    return _record
-        .getLongField(WorkflowConfigProperty.Timeout.name(), 
TaskConstants.DEFAULT_NEVER_TIMEOUT);
+    return _record.getLongField(WorkflowConfigProperty.Timeout.name(),
+        TaskConstants.DEFAULT_NEVER_TIMEOUT);
   }
 
   public static SimpleDateFormat getDefaultDateFormat() {
-    SimpleDateFormat defaultDateFormat = new SimpleDateFormat(
-        "MM-dd-yyyy HH:mm:ss");
+    SimpleDateFormat defaultDateFormat = new SimpleDateFormat("MM-dd-yyyy 
HH:mm:ss");
     defaultDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
 
     return defaultDateFormat;
@@ -263,15 +272,13 @@ public class  WorkflowConfig extends ResourceConfig {
 
   /**
    * Get the scheduled start time of the workflow.
-   *
    * @return start time if the workflow schedule is set, null if no schedule 
config set.
    */
   public Date getStartTime() {
     // Workflow with non-scheduled config is ready to schedule immediately.
     try {
-      return simpleConfigContains(WorkflowConfigProperty.StartTime.name())
-          ? WorkflowConfig.getDefaultDateFormat()
-          .parse(getSimpleConfig(WorkflowConfigProperty.StartTime.name()))
+      return simpleConfigContains(WorkflowConfigProperty.StartTime.name()) ? 
WorkflowConfig
+          
.getDefaultDateFormat().parse(getSimpleConfig(WorkflowConfigProperty.StartTime.name()))
           : null;
     } catch (ParseException e) {
       LOG.error("Unparseable date " + 
getSimpleConfig(WorkflowConfigProperty.StartTime.name()), e);
@@ -285,7 +292,6 @@ public class  WorkflowConfig extends ResourceConfig {
 
   /**
    * Get a ScheduleConfig from a workflow config string map
-   *
    * @param cfg the string map
    * @return a ScheduleConfig if one exists, otherwise null
    */
@@ -297,24 +303,29 @@ public class  WorkflowConfig extends ResourceConfig {
         startTime = WorkflowConfig.getDefaultDateFormat()
             .parse(cfg.get(WorkflowConfigProperty.StartTime.name()));
       } catch (ParseException e) {
-        LOG.error(
-            "Unparseable date " + 
cfg.get(WorkflowConfigProperty.StartTime.name()),
-            e);
+        LOG.error("Unparseable date " + 
cfg.get(WorkflowConfigProperty.StartTime.name()), e);
         return null;
       }
     }
-    if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name()) && cfg
-        .containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) {
+    if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name())
+        && cfg.containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) {
       return ScheduleConfig.recurringFromDate(startTime,
           
TimeUnit.valueOf(cfg.get(WorkflowConfigProperty.RecurrenceUnit.name())),
-          Long.parseLong(
-              cfg.get(WorkflowConfigProperty.RecurrenceInterval.name())));
+          
Long.parseLong(cfg.get(WorkflowConfigProperty.RecurrenceInterval.name())));
     } else if (startTime != null) {
       return ScheduleConfig.oneTimeDelayedStart(startTime);
     }
     return null;
   }
 
+  /**
+   * Returns the quota type set for this workflow.
+   * @return quotaType string, null if quota type is not set
+   */
+  public String getQuotaType() {
+    return getSimpleConfig(WorkflowConfigProperty.QuotaType.name());
+  }
+
   public static WorkflowConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -341,16 +352,18 @@ public class  WorkflowConfig extends ResourceConfig {
     private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
     private boolean _allowOverlapJobAssignment = 
DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
     private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
+    private String _quotaType = null;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, 
_targetState, _expiry,
           _failureThreshold, _isTerminable, _scheduleConfig, _capacity, 
_workflowType, _isJobQueue,
-          _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout);
+          _jobTypes, _jobPurgeInterval, _allowOverlapJobAssignment, _timeout, 
_quotaType);
     }
 
-    public Builder() {}
+    public Builder() {
+    }
 
     public Builder(String workflowId) {
       _workflowId = workflowId;
@@ -372,6 +385,7 @@ public class  WorkflowConfig extends ResourceConfig {
       _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
       _allowOverlapJobAssignment = 
workflowConfig.isAllowOverlapJobAssignment();
       _timeout = workflowConfig.getTimeout();
+      _quotaType = workflowConfig.getQuotaType();
     }
 
     public Builder setWorkflowId(String v) {
@@ -397,10 +411,8 @@ public class  WorkflowConfig extends ResourceConfig {
     /**
      * The expiry time for this workflow. Helix may clean up the workflow 
information after the
      * expiry time from the completion of the workflow.
-     *
      * @param v
      * @param unit
-     *
      * @return
      */
     public Builder setExpiry(long v, TimeUnit unit) {
@@ -411,9 +423,7 @@ public class  WorkflowConfig extends ResourceConfig {
     /**
      * The expiry time for this workflow. Helix may clean up the workflow 
information after the
      * expiry time from the completion of the workflow.
-     *
      * @param v in milliseconds
-     *
      * @return
      */
     public Builder setExpiry(long v) {
@@ -424,9 +434,7 @@ public class  WorkflowConfig extends ResourceConfig {
     /**
      * The time periodical Helix should clean up all completed jobs. This 
config applies only on
      * JobQueue.
-     *
      * @param t in milliseconds
-     *
      * @return
      */
     public Builder setJobPurgeInterval(long t) {
@@ -436,9 +444,7 @@ public class  WorkflowConfig extends ResourceConfig {
 
     /**
      * The max allowed numbers of failed jobs before Helix should marks the 
workflow failure.
-     *
      * @param failureThreshold
-     *
      * @return
      */
     public Builder setFailureThreshold(int failureThreshold) {
@@ -483,7 +489,7 @@ public class  WorkflowConfig extends ResourceConfig {
 
     /**
      * ONLY generic workflow can be timeouted. JobQueue does not allow to be 
timeouted.
-     * @param timeout  The timeout period
+     * @param timeout The timeout period
      * @return
      */
     public Builder setTimeout(long timeout) {
@@ -502,6 +508,17 @@ public class  WorkflowConfig extends ResourceConfig {
       return this;
     }
 
+    /**
+     * Set the quota type for this workflow. If a workflow has a quota type 
set,
+     * all of its jobs will be of that quota type.
+     * @param quotaType
+     * @return
+     */
+    public Builder setQuotaType(String quotaType) {
+      _quotaType = quotaType;
+      return this;
+    }
+
     @Deprecated
     public static Builder fromMap(Map<String, String> cfg) {
       Builder builder = new Builder();
@@ -586,6 +603,10 @@ public class  WorkflowConfig extends ResourceConfig {
         
setTimeout(Long.parseLong(cfg.get(WorkflowConfigProperty.Timeout.name())));
       }
 
+      if (cfg.containsKey(WorkflowConfigProperty.QuotaType.name())) {
+        setQuotaType(cfg.get(WorkflowConfigProperty.QuotaType.name()));
+      }
+
       return this;
     }
 
@@ -635,6 +656,7 @@ public class  WorkflowConfig extends ResourceConfig {
         b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule));
       }
       b.setExpiry(workflowBean.expiry);
+      b.setQuotaType(workflowBean.quotaType);
 
       return b;
     }
@@ -643,8 +665,8 @@ public class  WorkflowConfig extends ResourceConfig {
       _taskDag.validate();
 
       if (_expiry < 0) {
-        throw new HelixException(String
-            .format("%s has invalid value %s", 
WorkflowConfigProperty.Expiry.name(), _expiry));
+        throw new HelixException(String.format("%s has invalid value %s",
+            WorkflowConfigProperty.Expiry.name(), _expiry));
       } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) {
         throw new HelixException(
             "Scheduler configuration is invalid. The configuration must have a 
start time if it is "

http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java 
b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 5674d92..e0f1a70 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -51,4 +51,5 @@ public class JobBean {
   public boolean ignoreDependentJobFailure = 
JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
   public int numberOfTasks = JobConfig.DEFAULT_NUMBER_OF_TASKS;
   public boolean rebalanceRunningTask = 
JobConfig.DEFAULT_REBALANCE_RUNNING_TASK;
+  public String quotaType;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/35fcfa0e/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java 
b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 2a9563e..ae01f5d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -32,4 +32,5 @@ public class WorkflowBean {
   public ScheduleBean schedule;
   public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
   public String workflowType;
+  public String quotaType; // Syntactic sugar for setting all of workflow's 
jobs to this quota type
 }

Reply via email to