Repository: incubator-gobblin Updated Branches: refs/heads/master 7d970606b -> a9c9f781f
[GOBBLIN-282] Azkaban templates Closes #2135 from ibuenros/azkaban-templates Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a9c9f781 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a9c9f781 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a9c9f781 Branch: refs/heads/master Commit: a9c9f781f43bef85f96178999e266bcb5b5fb3ff Parents: 7d97060 Author: ibuenros <[email protected]> Authored: Wed Oct 11 08:27:25 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Wed Oct 11 08:27:25 2017 -0700 ---------------------------------------------------------------------- .../gobblin/azkaban/AzkabanJobLauncher.java | 25 +++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a9c9f781/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index bdbb04f..20b630b 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -19,6 +19,7 @@ package org.apache.gobblin.azkaban; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.Properties; import java.util.Set; @@ -30,6 +31,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator; +import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -42,6 +45,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Closer; +import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -88,6 +92,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class); public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride"; + public static final String TEMPLATE_KEY = "gobblin.template.uri"; private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name"; private static final String AZKABAN_LINK_JOBEXEC_URL = "azkaban.link.jobexec.url"; @@ -158,30 +163,38 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath()); } + Properties jobProps = this.props; + if (jobProps.containsKey(TEMPLATE_KEY)) { + URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY)); + Config resolvedJob = new PackagedTemplatesJobCatalogDecorator().getTemplate(templateUri) + .getResolvedConfig(ConfigUtils.propertiesToConfig(jobProps)); + jobProps = ConfigUtils.configToProperties(resolvedJob); + } + List<Tag<?>> tags = Lists.newArrayList(); tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags())); RootMetricContext.get(tags); - GobblinMetrics.addCustomTagsToProperties(this.props, tags); + GobblinMetrics.addCustomTagsToProperties(jobProps, tags); // If the job launcher type is not specified in the job configuration, // override the default to use the MAPREDUCE launcher. - if (!this.props.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) { - this.props.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, + if (!jobProps.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) { + jobProps.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, JobLauncherFactory.JobLauncherType.MAPREDUCE.toString()); } this.ownAzkabanSla = Long.parseLong( - this.props.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS)); + jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS)); // Create a JobLauncher instance depending on the configuration. The same properties object is // used for both system and job configuration properties because Azkaban puts configuration // properties in the .job file and in the .properties file into the same Properties object. - this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(this.props, this.props)); + this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps)); // Since Java classes cannot extend multiple classes and Azkaban jobs must extend AbstractJob, we must use composition // verses extending ServiceBasedAppLauncher this.applicationLauncher = - this.closer.register(new ServiceBasedAppLauncher(this.props, "Azkaban-" + UUID.randomUUID())); + this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID())); } @Override
