Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2631c5ae6 -> 94bdc6f39


[GOBBLIN-636] Use FS scheme and relative URIs for specifying job template 
locations in GaaS.

Closes #2506 from sv2000/templateUri


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/94bdc6f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/94bdc6f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/94bdc6f3

Branch: refs/heads/master
Commit: 94bdc6f3932b4b55b7f4b26d869c594b6f88bb35
Parents: 2631c5a
Author: suvasude <[email protected]>
Authored: Tue Nov 27 14:15:06 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Nov 27 14:15:06 2018 -0800

----------------------------------------------------------------------
 .../service/modules/spec/JobExecutionPlan.java  |  4 +++
 .../modules/template_catalog/FSFlowCatalog.java | 36 ++++++++++++--------
 .../flowEdgeTemplate/jobs/job1.job              |  2 +-
 .../flowEdgeTemplate/jobs/job2.job              |  2 +-
 .../flowEdgeTemplate/jobs/job3.job              |  2 +-
 .../flowEdgeTemplate/jobs/job4.job              |  2 +-
 .../jobs/hdfs-encrypt-avro-to-json.job          |  2 +-
 .../jobs/hdfs-snapshot-retention.job            |  2 +-
 .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job       |  2 +-
 .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job     |  2 +-
 .../localToHdfs/jobs/distcp-local-to-hdfs.job   |  2 +-
 11 files changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 7517c17..685513c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -37,6 +37,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -103,6 +104,9 @@ public class JobExecutionPlan {
       // Remove schedule
       
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
 
+      //Remove template uri
+      
jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowCatalog.JOB_TEMPLATE_KEY));
+
       // Add job.name and job.group
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName)));
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY,
 ConfigValueFactory.fromAnyRef(flowGroup)));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
index 59c3c87..e1f2b6a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
@@ -107,7 +107,7 @@ public class FSFlowCatalog extends FSJobCatalog implements 
FlowCatalogWithTempla
 
   /**
    *
-   * @param flowTemplateDirURI URI of the flow template directory
+   * @param flowTemplateDirURI Relative URI of the flow template directory
    * @return a list of {@link JobTemplate}s for a given flow identified by its 
{@link URI}.
    * @throws IOException
    * @throws SpecNotFoundException
@@ -135,21 +135,27 @@ public class FSFlowCatalog extends FSJobCatalog 
implements FlowCatalogWithTempla
     List<JobTemplate> jobTemplates = new ArrayList<>();
 
     String templateCatalogDir = 
this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
-    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), 
new Path(flowTemplateDirURI));
-    Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME);
-    FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new 
Configuration());
-
-    for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, 
extensionFilter)) {
-      Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
-      if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) {
-        URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY));
-        //Strip out the initial "/"
-        URI actualResourceUri = new URI(templateUri.getPath().substring(1));
-        Path fullTemplatePath =
-            new 
Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI());
-        templateConfig = 
templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true));
+
+    //Flow templates are located under templateCatalogDir/flowEdgeTemplates
+    Path flowTemplateDirPath = PathUtils.mergePaths(new 
Path(templateCatalogDir), new Path(flowTemplateDirURI));
+    //Job files (with extension .job) are located under 
templateCatalogDir/flowEdgeTemplates/jobs directory.
+    Path jobFilePath = new Path(flowTemplateDirPath, JOBS_DIR_NAME);
+
+    FileSystem fs = FileSystem.get(jobFilePath.toUri(), new Configuration());
+
+    for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) {
+      Config jobConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+      //Check if the .job file has an underlying job template
+      if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) {
+        URI jobTemplateRelativeUri = new 
URI(jobConfig.getString(JOB_TEMPLATE_KEY));
+        if (!jobTemplateRelativeUri.getScheme().equals(FS_SCHEME)) {
+          throw new RuntimeException(
+              "Expected scheme " + FS_SCHEME + " got unsupported scheme " + 
flowTemplateDirURI.getScheme());
+        }
+        Path fullJobTemplatePath = PathUtils.mergePaths(new 
Path(templateCatalogDir), new Path(jobTemplateRelativeUri));
+        jobConfig = 
jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath, true));
       }
-      jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, 
fileStatus.getPath().toUri(), this));
+      jobTemplates.add(new HOCONInputStreamJobTemplate(jobConfig, 
fileStatus.getPath().toUri(), this));
     }
     return jobTemplates;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
index 0d4f7c3..5177ce6 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
@@ -1 +1 @@
-gobblin.template.uri="resource:///template_catalog/templates/job1.template"
+gobblin.template.uri="FS:///templates/job1.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
index 3c8f864..85d9c37 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
@@ -1,2 +1,2 @@
-gobblin.template.uri="resource:///template_catalog/templates/job2.template"
+gobblin.template.uri="FS:///templates/job2.template"
 job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
index 44fa808..dbba7ae 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
@@ -1,2 +1,2 @@
-gobblin.template.uri="resource:///template_catalog/templates/job3.template"
+gobblin.template.uri="FS:///templates/job3.template"
 job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
index 5f3e1ff..629bfbc 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
@@ -1,2 +1,2 @@
-gobblin.template.uri="resource:///template_catalog/templates/job4.template"
+gobblin.template.uri="FS:///templates/job4.template"
 job.dependencies="job2,job3"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
index cda75cf..e0c8fa4 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
@@ -1 +1 @@
-gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"
+gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
index 46c43c2..8c05781 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
@@ -1,4 +1,4 @@
-gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-retention.template"
+gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-retention.template"
 #Should Retention job run in parallel with the next job?
 job.forkOnConcat=true
 #Dataset version finder class

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
index 37d0d9c..429d4c0 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
@@ -1 +1 @@
-gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
index fe627c9..2d1672c 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
@@ -1 +1 @@
-gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
index fe627c9..2d1672c 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
@@ -1 +1 @@
-gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"

Reply via email to