Repository: incubator-gobblin Updated Branches: refs/heads/master 2631c5ae6 -> 94bdc6f39
[GOBBLIN-636] Use FS scheme and relative URIs for specifying job template locations in GaaS. Closes #2506 from sv2000/templateUri Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/94bdc6f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/94bdc6f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/94bdc6f3 Branch: refs/heads/master Commit: 94bdc6f3932b4b55b7f4b26d869c594b6f88bb35 Parents: 2631c5a Author: suvasude <[email protected]> Authored: Tue Nov 27 14:15:06 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 27 14:15:06 2018 -0800 ---------------------------------------------------------------------- .../service/modules/spec/JobExecutionPlan.java | 4 +++ .../modules/template_catalog/FSFlowCatalog.java | 36 ++++++++++++-------- .../flowEdgeTemplate/jobs/job1.job | 2 +- .../flowEdgeTemplate/jobs/job2.job | 2 +- .../flowEdgeTemplate/jobs/job3.job | 2 +- .../flowEdgeTemplate/jobs/job4.job | 2 +- .../jobs/hdfs-encrypt-avro-to-json.job | 2 +- .../jobs/hdfs-snapshot-retention.job | 2 +- .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 2 +- .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 2 +- .../localToHdfs/jobs/distcp-local-to-hdfs.job | 2 +- 11 files changed, 34 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index 7517c17..685513c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -37,6 +37,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; import org.apache.gobblin.util.ConfigUtils; @@ -103,6 +104,9 @@ public class JobExecutionPlan { // Remove schedule jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)); + //Remove template uri + jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowCatalog.JOB_TEMPLATE_KEY)); + // Add job.name and job.group jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName))); jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java index 59c3c87..e1f2b6a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java @@ -107,7 +107,7 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla /** * - * @param flowTemplateDirURI URI of the flow template directory + * @param flowTemplateDirURI Relative URI of the flow template directory * @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}. * @throws IOException * @throws SpecNotFoundException @@ -135,21 +135,27 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla List<JobTemplate> jobTemplates = new ArrayList<>(); String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY); - Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI)); - Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME); - FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new Configuration()); - - for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, extensionFilter)) { - Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true); - if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) { - URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY)); - //Strip out the initial "/" - URI actualResourceUri = new URI(templateUri.getPath().substring(1)); - Path fullTemplatePath = - new Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI()); - templateConfig = templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true)); + + //Flow templates are located under templateCatalogDir/flowEdgeTemplates + Path flowTemplateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI)); + //Job files (with extension .job) are located under templateCatalogDir/flowEdgeTemplates/jobs directory. + Path jobFilePath = new Path(flowTemplateDirPath, JOBS_DIR_NAME); + + FileSystem fs = FileSystem.get(jobFilePath.toUri(), new Configuration()); + + for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) { + Config jobConfig = loadHoconFileAtPath(fileStatus.getPath(), true); + //Check if the .job file has an underlying job template + if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) { + URI jobTemplateRelativeUri = new URI(jobConfig.getString(JOB_TEMPLATE_KEY)); + if (!jobTemplateRelativeUri.getScheme().equals(FS_SCHEME)) { + throw new RuntimeException( + "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme()); + } + Path fullJobTemplatePath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(jobTemplateRelativeUri)); + jobConfig = jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath, true)); } - jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, fileStatus.getPath().toUri(), this)); + jobTemplates.add(new HOCONInputStreamJobTemplate(jobConfig, fileStatus.getPath().toUri(), this)); } return jobTemplates; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job index 0d4f7c3..5177ce6 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job @@ -1 +1 @@ -gobblin.template.uri="resource:///template_catalog/templates/job1.template" +gobblin.template.uri="FS:///templates/job1.template" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job index 3c8f864..85d9c37 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job @@ -1,2 +1,2 @@ -gobblin.template.uri="resource:///template_catalog/templates/job2.template" +gobblin.template.uri="FS:///templates/job2.template" job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job index 44fa808..dbba7ae 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job @@ -1,2 +1,2 @@ -gobblin.template.uri="resource:///template_catalog/templates/job3.template" +gobblin.template.uri="FS:///templates/job3.template" job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job index 5f3e1ff..629bfbc 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job @@ -1,2 +1,2 @@ -gobblin.template.uri="resource:///template_catalog/templates/job4.template" +gobblin.template.uri="FS:///templates/job4.template" job.dependencies="job2,job3" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job index cda75cf..e0c8fa4 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job @@ -1 +1 @@ -gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template" +gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job index 46c43c2..8c05781 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job @@ -1,4 +1,4 @@ -gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-retention.template" +gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-retention.template" #Should Retention job run in parallel with the next job? job.forkOnConcat=true #Dataset version finder class http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job index 37d0d9c..429d4c0 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job @@ -1 +1 @@ -gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template" \ No newline at end of file +gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job index fe627c9..2d1672c 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job @@ -1 +1 @@ -gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template" +gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bdc6f3/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job index fe627c9..2d1672c 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job @@ -1 +1 @@ -gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template" +gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"
