This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9ee4dca [GOBBLIN-938] Make job-template resolution available in all
JobLaunchers
9ee4dca is described below
commit 9ee4dcaf66257b6e2926cf1470b16b912cd343ff
Author: autumnust <[email protected]>
AuthorDate: Thu Oct 31 07:20:48 2019 -0700
[GOBBLIN-938] Make job-template resolution available in all JobLaunchers
Closes #2787 from autumnust/ETL-9696
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 1 -
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 56 ++++++++--------------
.../gobblin/runtime/AbstractJobLauncher.java | 36 +++++++++++++-
.../runtime/{local => }/LocalJobLauncherTest.java | 39 ++++++++++++++-
.../service/modules/spec/JobExecutionPlan.java | 5 +-
.../template_catalog/FSFlowTemplateCatalog.java | 7 +--
6 files changed, 101 insertions(+), 43 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 80cbe58..e820c1f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -151,7 +151,6 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.stateSerDeRunnerThreads =
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
-
jobConfig = ConfigUtils.propertiesToConfig(jobProps);
this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 6b99f81..e576468 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.azkaban;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,29 +31,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-import azkaban.jobExecutor.AbstractJob;
-import javax.annotation.Nullable;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.State;
@@ -77,7 +53,27 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.TimeRangeChecker;
import org.apache.gobblin.util.hadoop.TokenUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import azkaban.jobExecutor.AbstractJob;
+import javax.annotation.Nullable;
+
+import static
org.apache.gobblin.runtime.AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary;
import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -104,7 +100,6 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
public static final String GOBBLIN_LOG_LEVEL_KEY =
"gobblin.log.levelOverride";
public static final String GOBBLIN_CUSTOM_JOB_LISTENERS =
"gobblin.custom.job.listeners";
- public static final String TEMPLATE_KEY = "gobblin.template.uri";
private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
private static final String AZKABAN_LINK_JOBEXEC_URL =
"azkaban.link.jobexec.url";
@@ -200,16 +195,7 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
}
Properties jobProps = this.props;
- if (jobProps.containsKey(TEMPLATE_KEY)) {
- Config config = ConfigUtils.propertiesToConfig(jobProps);
- JobSpecResolver resolver = JobSpecResolver.builder(config).build();
-
- URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY));
- JobSpec jobSpec =
JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
- ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
- jobProps = ConfigUtils.configToProperties(resolvedJob.getConfig());
- }
-
+ resolveGobblinJobTemplateIfNecessary(jobProps);
GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
// If the job launcher type is not specified in the job configuration,
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 1598848..d9241c5 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.runtime;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -26,6 +28,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -40,6 +48,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import javax.annotation.Nullable;
@@ -88,6 +97,7 @@ import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+
/**
* An abstract implementation of {@link JobLauncher} that handles common tasks
for launching and running a job.
*
@@ -104,6 +114,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
+ public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
+
// Job configuration properties
protected final Properties jobProps;
@@ -160,9 +172,10 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags);
- // Make a copy for both the system and job configuration properties
+ // Make a copy for both the system and job configuration properties and
resolve the job-template if any.
this.jobProps = new Properties();
this.jobProps.putAll(jobProps);
+ resolveGobblinJobTemplateIfNecessary(this.jobProps);
if (!tryLockJob(this.jobProps)) {
throw new JobException(String.format("Previous instance of job %s is
still running, skipping this scheduled run",
@@ -213,6 +226,27 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
+
+ /**
+ * To supporting 'gobblin.template.uri' in any types of jobLauncher, place
this resolution as a public-static method
+ * to make it accessible for all implementation of JobLauncher and
**AzkabanJobLauncher**.
+ *
+ * @param jobProps Gobblin Job-level properties.
+ */
+ public static void resolveGobblinJobTemplateIfNecessary(Properties jobProps)
throws IOException, URISyntaxException,
+
SpecNotFoundException,
+
JobTemplate.TemplateException {
+ if (jobProps.containsKey(GOBBLIN_JOB_TEMPLATE_KEY)) {
+ Config config = ConfigUtils.propertiesToConfig(jobProps);
+ JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+
+ URI templateUri = new
URI(jobProps.getProperty(GOBBLIN_JOB_TEMPLATE_KEY));
+ JobSpec jobSpec =
JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+ ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
+ jobProps.putAll(ConfigUtils.configToProperties(resolvedJob.getConfig()));
+ }
+ }
+
private static SharedResourcesBroker<GobblinScopeTypes>
createDefaultInstanceBroker(Properties jobProps) {
LOG.warn("Creating a job specific {}. Objects will only be shared at the
job level.",
SharedResourcesBroker.class.getSimpleName());
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
similarity index 87%
rename from
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
rename to
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index a81aec6..3715082 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -15,12 +15,19 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.local;
+package org.apache.gobblin.runtime;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobLauncherFactory;
+import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -38,6 +45,10 @@ import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.WriterOutputFormat;
+import com.google.common.io.Closer;
+
+import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
/**
* Unit test for {@link LocalJobLauncher}.
@@ -82,6 +93,32 @@ public class LocalJobLauncherTest {
}
@Test
+ public void testJobTemplateResolutionInAbstractLauncher() throws Exception {
+ Properties jobProps = loadJobProps();
+ String jobId = JobLauncherUtils.newJobId("beforeResolution");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+ jobProps.setProperty("job.name", "beforeResolution");
+ jobProps.setProperty(GOBBLIN_JOB_TEMPLATE_KEY,
"resource:///templates/distcp-ng.template");
+
+
+ JobContext jobContext = null;
+ Closer closer = Closer.create();
+ try {
+ JobLauncher jobLauncher =
closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps,
jobProps));
+ jobContext = ((AbstractJobLauncher) jobLauncher).getJobContext();
+ } finally {
+ closer.close();
+ }
+
+ // Indicating resolution succeeded.
+ // 1) User config not being overloaded by template
+ // 2) Conf that not appearing in the user-config being populated by
template
+ System.out.println(jobContext.getJobState());
+ Assert.assertEquals(jobContext.getJobState().getProp("job.name"),
"beforeResolution");
+
Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"),
"/tmp/distcp-persist-dir");
+ }
+
+ @Test
public void testLaunchJobWithPullLimit() throws Exception {
int limit = 10;
Properties jobProps = loadJobProps();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 82d2bae..027bcc6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -46,9 +46,10 @@ import org.apache.gobblin.service.ExecutionStatus;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
+import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
/**
* A data class that encapsulates information for executing a job. This
includes a {@link JobSpec} and a {@link SpecExecutor}
@@ -120,7 +121,7 @@ public class JobExecutionPlan {
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
//Remove template uri
-
jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowTemplateCatalog.JOB_TEMPLATE_KEY));
+
jobSpec.setConfig(jobSpec.getConfig().withoutPath(GOBBLIN_JOB_TEMPLATE_KEY));
// Add job.name and job.group
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY,
ConfigValueFactory.fromAnyRef(jobName)));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index e01f90a..24e3ff2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -47,6 +47,8 @@ import
org.apache.gobblin.service.modules.template.FlowTemplate;
import
org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate;
import org.apache.gobblin.util.PathUtils;
+import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
/**
* An implementation of a catalog for {@link FlowTemplate}s. Provides basic
API for retrieving a {@link FlowTemplate}
@@ -67,7 +69,6 @@ public class FSFlowTemplateCatalog extends FSJobCatalog
implements FlowCatalogWi
public static final String JOBS_DIR_NAME = "jobs";
public static final String FLOW_CONF_FILE_NAME = "flow.conf";
public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job",
".template");
- public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri";
protected static final String FS_SCHEME = "FS";
@@ -145,8 +146,8 @@ public class FSFlowTemplateCatalog extends FSJobCatalog
implements FlowCatalogWi
for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) {
Config jobConfig = loadHoconFileAtPath(fileStatus.getPath());
//Check if the .job file has an underlying job template
- if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) {
- URI jobTemplateRelativeUri = new
URI(jobConfig.getString(JOB_TEMPLATE_KEY));
+ if (jobConfig.hasPath(GOBBLIN_JOB_TEMPLATE_KEY)) {
+ URI jobTemplateRelativeUri = new
URI(jobConfig.getString(GOBBLIN_JOB_TEMPLATE_KEY));
if (!jobTemplateRelativeUri.getScheme().equals(FS_SCHEME)) {
throw new RuntimeException(
"Expected scheme " + FS_SCHEME + " got unsupported scheme " +
flowTemplateDirURI.getScheme());