This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee4dca  [GOBBLIN-938] Make job-template resolution available in all 
JobLaunchers
9ee4dca is described below

commit 9ee4dcaf66257b6e2926cf1470b16b912cd343ff
Author: autumnust <[email protected]>
AuthorDate: Thu Oct 31 07:20:48 2019 -0700

    [GOBBLIN-938] Make job-template resolution available in all JobLaunchers
    
    Closes #2787 from autumnust/ETL-9696
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  1 -
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java | 56 ++++++++--------------
 .../gobblin/runtime/AbstractJobLauncher.java       | 36 +++++++++++++-
 .../runtime/{local => }/LocalJobLauncherTest.java  | 39 ++++++++++++++-
 .../service/modules/spec/JobExecutionPlan.java     |  5 +-
 .../template_catalog/FSFlowTemplateCatalog.java    |  7 +--
 6 files changed, 101 insertions(+), 43 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 80cbe58..e820c1f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -151,7 +151,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     this.stateSerDeRunnerThreads = 
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
         Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
-
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
     this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 6b99f81..e576468 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.azkaban;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -32,29 +31,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-import azkaban.jobExecutor.AbstractJob;
-import javax.annotation.Nullable;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.configuration.State;
@@ -77,7 +53,27 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.TimeRangeChecker;
 import org.apache.gobblin.util.hadoop.TokenUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
 
+import azkaban.jobExecutor.AbstractJob;
+import javax.annotation.Nullable;
+
+import static 
org.apache.gobblin.runtime.AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary;
 import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
 
 
@@ -104,7 +100,6 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
   public static final String GOBBLIN_LOG_LEVEL_KEY = 
"gobblin.log.levelOverride";
   public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = 
"gobblin.custom.job.listeners";
-  public static final String TEMPLATE_KEY = "gobblin.template.uri";
 
   private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
   private static final String AZKABAN_LINK_JOBEXEC_URL = 
"azkaban.link.jobexec.url";
@@ -200,16 +195,7 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
     }
 
     Properties jobProps = this.props;
-    if (jobProps.containsKey(TEMPLATE_KEY)) {
-      Config config = ConfigUtils.propertiesToConfig(jobProps);
-      JobSpecResolver resolver = JobSpecResolver.builder(config).build();
-
-      URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY));
-      JobSpec jobSpec = 
JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
-      ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
-      jobProps = ConfigUtils.configToProperties(resolvedJob.getConfig());
-    }
-
+    resolveGobblinJobTemplateIfNecessary(jobProps);
     GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
 
     // If the job launcher type is not specified in the job configuration,
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 1598848..d9241c5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.runtime;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -26,6 +28,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -40,6 +48,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.io.Closer;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import javax.annotation.Nullable;
@@ -88,6 +97,7 @@ import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
+
 /**
  * An abstract implementation of {@link JobLauncher} that handles common tasks 
for launching and running a job.
  *
@@ -104,6 +114,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
   public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
 
+  public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
+
   // Job configuration properties
   protected final Properties jobProps;
 
@@ -160,9 +172,10 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
     GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags);
 
-    // Make a copy for both the system and job configuration properties
+    // Make a copy for both the system and job configuration properties and 
resolve the job-template if any.
     this.jobProps = new Properties();
     this.jobProps.putAll(jobProps);
+    resolveGobblinJobTemplateIfNecessary(this.jobProps);
 
     if (!tryLockJob(this.jobProps)) {
       throw new JobException(String.format("Previous instance of job %s is 
still running, skipping this scheduled run",
@@ -213,6 +226,27 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     }
   }
 
+
+  /**
+   * To supporting 'gobblin.template.uri' in any types of jobLauncher, place 
this resolution as a public-static method
+   * to make it accessible for all implementation of JobLauncher and 
**AzkabanJobLauncher**.
+   *
+   * @param jobProps Gobblin Job-level properties.
+   */
+  public static void resolveGobblinJobTemplateIfNecessary(Properties jobProps) 
throws IOException, URISyntaxException,
+                                                                               
       SpecNotFoundException,
+                                                                               
       JobTemplate.TemplateException {
+    if (jobProps.containsKey(GOBBLIN_JOB_TEMPLATE_KEY)) {
+      Config config = ConfigUtils.propertiesToConfig(jobProps);
+      JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+
+      URI templateUri = new 
URI(jobProps.getProperty(GOBBLIN_JOB_TEMPLATE_KEY));
+      JobSpec jobSpec = 
JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+      ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
+      jobProps.putAll(ConfigUtils.configToProperties(resolvedJob.getConfig()));
+    }
+  }
+
   private static SharedResourcesBroker<GobblinScopeTypes> 
createDefaultInstanceBroker(Properties jobProps) {
     LOG.warn("Creating a job specific {}. Objects will only be shared at the 
job level.",
         SharedResourcesBroker.class.getSimpleName());
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
similarity index 87%
rename from 
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
rename to 
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index a81aec6..3715082 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.local;
+package org.apache.gobblin.runtime;
 
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobLauncherFactory;
+import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -38,6 +45,10 @@ import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
 import org.apache.gobblin.writer.Destination;
 import org.apache.gobblin.writer.WriterOutputFormat;
 
+import com.google.common.io.Closer;
+
+import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
 
 /**
  * Unit test for {@link LocalJobLauncher}.
@@ -82,6 +93,32 @@ public class LocalJobLauncherTest {
   }
 
   @Test
+  public void testJobTemplateResolutionInAbstractLauncher() throws Exception {
+    Properties jobProps = loadJobProps();
+    String jobId = JobLauncherUtils.newJobId("beforeResolution");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+    jobProps.setProperty("job.name", "beforeResolution");
+    jobProps.setProperty(GOBBLIN_JOB_TEMPLATE_KEY, 
"resource:///templates/distcp-ng.template");
+
+
+    JobContext jobContext = null;
+    Closer closer = Closer.create();
+    try {
+      JobLauncher jobLauncher = 
closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, 
jobProps));
+      jobContext = ((AbstractJobLauncher) jobLauncher).getJobContext();
+    } finally {
+      closer.close();
+    }
+
+    // Indicating resolution succeeded.
+    // 1) User config not being overloaded by template
+    // 2) Conf that not appearing in the user-config being populated by 
template
+    System.out.println(jobContext.getJobState());
+    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), 
"beforeResolution");
+    
Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), 
"/tmp/distcp-persist-dir");
+  }
+
+  @Test
   public void testLaunchJobWithPullLimit() throws Exception {
     int limit = 10;
     Properties jobProps = loadJobProps();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 82d2bae..027bcc6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -46,9 +46,10 @@ import org.apache.gobblin.service.ExecutionStatus;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
 
 /**
  * A data class that encapsulates information for executing a job. This 
includes a {@link JobSpec} and a {@link SpecExecutor}
@@ -120,7 +121,7 @@ public class JobExecutionPlan {
       
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
 
       //Remove template uri
-      
jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowTemplateCatalog.JOB_TEMPLATE_KEY));
+      
jobSpec.setConfig(jobSpec.getConfig().withoutPath(GOBBLIN_JOB_TEMPLATE_KEY));
 
       // Add job.name and job.group
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName)));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index e01f90a..24e3ff2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -47,6 +47,8 @@ import 
org.apache.gobblin.service.modules.template.FlowTemplate;
 import 
org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate;
 import org.apache.gobblin.util.PathUtils;
 
+import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
+
 
 /**
  * An implementation of a catalog for {@link FlowTemplate}s. Provides basic 
API for retrieving a {@link FlowTemplate}
@@ -67,7 +69,6 @@ public class FSFlowTemplateCatalog extends FSJobCatalog 
implements FlowCatalogWi
   public static final String JOBS_DIR_NAME = "jobs";
   public static final String FLOW_CONF_FILE_NAME = "flow.conf";
   public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", 
".template");
-  public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri";
 
   protected static final String FS_SCHEME = "FS";
 
@@ -145,8 +146,8 @@ public class FSFlowTemplateCatalog extends FSJobCatalog 
implements FlowCatalogWi
     for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) {
       Config jobConfig = loadHoconFileAtPath(fileStatus.getPath());
       //Check if the .job file has an underlying job template
-      if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) {
-        URI jobTemplateRelativeUri = new 
URI(jobConfig.getString(JOB_TEMPLATE_KEY));
+      if (jobConfig.hasPath(GOBBLIN_JOB_TEMPLATE_KEY)) {
+        URI jobTemplateRelativeUri = new 
URI(jobConfig.getString(GOBBLIN_JOB_TEMPLATE_KEY));
         if (!jobTemplateRelativeUri.getScheme().equals(FS_SCHEME)) {
           throw new RuntimeException(
               "Expected scheme " + FS_SCHEME + " got unsupported scheme " + 
flowTemplateDirURI.getScheme());

Reply via email to