Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b11cfa8b7 -> 69c65f842


[GOBBLIN-655] Allow helix job to have a job type.

Closes #2524 from kyuamazon/jobtype


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/69c65f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/69c65f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/69c65f84

Branch: refs/heads/master
Commit: 69c65f842504f2ffadeb19515c9c4810b8bf558c
Parents: b11cfa8
Author: Kuai Yu <[email protected]>
Authored: Fri Dec 14 10:08:13 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Fri Dec 14 10:08:13 2018 -0800

----------------------------------------------------------------------
 .../cluster/GobblinClusterConfigurationKeys.java      |  6 +++++-
 .../GobblinHelixDistributeJobExecutionLauncher.java   |  7 +++++++
 .../gobblin/cluster/GobblinHelixJobLauncher.java      | 14 ++++++++++++--
 3 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b2bd682..6791d52 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -75,11 +75,15 @@ public class GobblinClusterConfigurationKeys {
   public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = 
GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
   public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
 
-  // Helix related tagging
+  // Helix tagging
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobTag";
   public static final String HELIX_PLANNING_JOB_TAG_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "helixInstanceTags";
 
+  // Helix job quota
+  public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobType";
+  public static final String HELIX_PLANNING_JOB_TYPE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobType";
+
   // Planning job properties
   public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob";
   public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + 
"planning.";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index bc8443d..9424ca8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -223,6 +223,13 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
       jobConfigBuilder.setInstanceGroupTag(jobPlanningTag);
     }
 
+    // Planning job should have its own type support
+    if 
(jobProps.containsKey(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY))
 {
+      String jobType = 
jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY);
+      log.info("PlanningJob {} has types associated : {}", planningId, 
jobType);
+      jobConfigBuilder.setJobType(jobType);
+    }
+
     
jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
         
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 3389f84..732c7d3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -321,6 +321,12 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       jobConfigBuilder.setInstanceGroupTag(jobTag);
     }
 
+    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY)) {
+      String jobType = 
this.jobConfig.getString(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY));
+      log.info("Job {} has types associated : {}", this.jobContext.getJobId(), 
jobType);
+      jobConfigBuilder.setJobType(jobType);
+    }
+
     if 
(Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING))
 {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }
@@ -335,8 +341,12 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws 
Exception {
-    HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, 
this.jobContext.getJobId(),
-        this.helixTaskDriver, this.helixManager, 
this.workFlowExpiryTimeSeconds);
+    HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
+        this.helixWorkFlowName,
+        this.jobContext.getJobId(),
+        this.helixTaskDriver,
+        this.helixManager,
+        this.workFlowExpiryTimeSeconds);
   }
 
   public void launchJob(@Nullable JobListener jobListener)

Reply via email to