Repository: incubator-gobblin Updated Branches: refs/heads/master b11cfa8b7 -> 69c65f842
[GOBBLIN-655] Allow helix job to have a job type. Closes #2524 from kyuamazon/jobtype Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/69c65f84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/69c65f84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/69c65f84 Branch: refs/heads/master Commit: 69c65f842504f2ffadeb19515c9c4810b8bf558c Parents: b11cfa8 Author: Kuai Yu <[email protected]> Authored: Fri Dec 14 10:08:13 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Dec 14 10:08:13 2018 -0800 ---------------------------------------------------------------------- .../cluster/GobblinClusterConfigurationKeys.java | 6 +++++- .../GobblinHelixDistributeJobExecutionLauncher.java | 7 +++++++ .../gobblin/cluster/GobblinHelixJobLauncher.java | 14 ++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index b2bd682..6791d52 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -75,11 +75,15 @@ public class GobblinClusterConfigurationKeys { public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread"; public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true; - // Helix related tagging + // Helix tagging public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag"; public static final String HELIX_PLANNING_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag"; public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags"; + // Helix job quota + public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobType"; + public static final String HELIX_PLANNING_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobType"; + // Planning job properties public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob"; public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning."; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index bc8443d..9424ca8 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -223,6 +223,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher jobConfigBuilder.setInstanceGroupTag(jobPlanningTag); } + // Planning job should have its own type support + if (jobProps.containsKey(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY)) { + String jobType = jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_PLANNING_JOB_TYPE_KEY); + log.info("PlanningJob {} has types associated : {}", planningId, jobType); + jobConfigBuilder.setJobType(jobType); + } + jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/69c65f84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 3389f84..732c7d3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -321,6 +321,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfigBuilder.setInstanceGroupTag(jobTag); } + if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY)) { + String jobType = this.jobConfig.getString(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TYPE_KEY)); + log.info("Job {} has types associated : {}", this.jobContext.getJobId(), jobType); + jobConfigBuilder.setJobType(jobType); + } + if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { jobConfigBuilder.setRebalanceRunningTask(true); } @@ -335,8 +341,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { * Submit a job to run. */ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception { - HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, this.jobContext.getJobId(), - this.helixTaskDriver, this.helixManager, this.workFlowExpiryTimeSeconds); + HelixUtils.submitJobToWorkFlow(jobConfigBuilder, + this.helixWorkFlowName, + this.jobContext.getJobId(), + this.helixTaskDriver, + this.helixManager, + this.workFlowExpiryTimeSeconds); } public void launchJob(@Nullable JobListener jobListener)
