Repository: incubator-gobblin Updated Branches: refs/heads/master b9149bfd9 -> 2509f3a3d
[GOBBLIN-604] Map dependencies in job templates to the job names in compiled JobSpecs. Closes #2470 from sv2000/dependencies Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2509f3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2509f3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2509f3a3 Branch: refs/heads/master Commit: 2509f3a3d43ecc35caebe6e4b5ce283b035867b4 Parents: b9149bf Author: sv2000 <[email protected]> Authored: Wed Oct 10 21:09:49 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 10 21:09:49 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 4 +- .../service/modules/flow/FlowGraphPath.java | 99 +++++++++++++++++++- .../service/modules/spec/JobExecutionPlan.java | 12 ++- .../spec/JobExecutionPlanDagFactory.java | 26 ++--- .../modules/flow/MultiHopFlowCompilerTest.java | 52 ++++++++-- .../flowEdgeTemplate/jobs/job2.job | 3 +- .../flowEdgeTemplate/jobs/job3.job | 2 +- .../flowEdgeTemplate/jobs/job4.job | 2 +- .../template_catalog/templates/job1.template | 1 + .../template_catalog/templates/job2.template | 1 + .../template_catalog/templates/job3.template | 1 + .../template_catalog/templates/job4.template | 1 + .../test-template/jobs/job2.job | 3 +- .../test-template/jobs/job3.job | 2 +- .../test-template/jobs/job4.job | 2 +- 15 files changed, 168 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index f2f58f1..b624511 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -175,6 +175,7 @@ public class ConfigurationKeys { public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval"; public static final String WORK_UNIT_ENABLE_TRACKING_LOGS = "workunit.enableTrackingLogs"; + public static final String JOB_DEPENDENCIES = "job.dependencies"; public static final String JOB_RUN_ONCE_KEY = "job.runonce"; public static final String JOB_DISABLED_KEY = "job.disabled"; public static final String JOB_JAR_FILES_KEY = "job.jars"; @@ -207,10 +208,9 @@ public class ConfigurationKeys { public static final String JOB_TEMPLATE_PATH = "job.template"; /** - * Configuration property used only for job configuration file's tempalte, inside .template file + * Configuration property used only for job configuration file's template */ public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes"; - public static final String JOB_DEPENDENCIES = "dependencies"; /** * Configuration for emitting job events http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java index 4b81f1f..0c28c3b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -17,16 +17,29 @@ package org.apache.gobblin.service.modules.flow; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.Files; import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; import lombok.Getter; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -36,6 +49,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.service.modules.template.FlowTemplate; +import org.apache.gobblin.util.ConfigUtils; /** @@ -62,12 +76,12 @@ public class FlowGraphPath { public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>()); - for(List<FlowEdgeContext> path: paths) { + for (List<FlowEdgeContext> path: paths) { Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>()); Iterator<FlowEdgeContext> pathIterator = path.iterator(); while (pathIterator.hasNext()) { Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next()); - pathDag = pathDag.concatenate(flowEdgeDag); + pathDag = concatenate(pathDag, flowEdgeDag); } flowDag = flowDag.merge(pathDag); } @@ -75,12 +89,35 @@ public class FlowGraphPath { } /** + * Concatenate two {@link Dag}s. Modify the {@link ConfigurationKeys#JOB_DEPENDENCIES} in the {@link JobSpec}s of the child + * {@link Dag} to reflect the concatenation operation. + * @param dagLeft The parent dag. + * @param dagRight The child dag. + * @return The concatenated dag with modified {@link ConfigurationKeys#JOB_DEPENDENCIES}. + */ + private Dag<JobExecutionPlan> concatenate(Dag<JobExecutionPlan> dagLeft, Dag<JobExecutionPlan> dagRight) { + List<Dag.DagNode<JobExecutionPlan>> endNodes = dagLeft.getEndNodes(); + List<Dag.DagNode<JobExecutionPlan>> startNodes = dagRight.getStartNodes(); + List<String> dependenciesList = Lists.newArrayList(); + for (Dag.DagNode<JobExecutionPlan> dagNode: endNodes) { + dependenciesList.add(dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); + } + String dependencies = Joiner.on(",").join(dependenciesList); + + for (Dag.DagNode<JobExecutionPlan> childNode: startNodes) { + JobSpec jobSpec = childNode.getValue().getJobSpec(); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef(dependencies))); + } + + return dagLeft.concatenate(dagRight); + } + /** * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}. * @param flowEdgeContext an instance of {@link FlowEdgeContext}. * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}. */ - private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext) + private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext) throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate(); DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor(); @@ -89,13 +126,67 @@ public class FlowGraphPath { SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor(); List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + Map<String, String> templateToJobNameMap = new HashMap<>(); //Get resolved job configs from the flow template List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor); //Iterate over each resolved job config and convert the config to a JobSpec. for (Config resolvedJobConfig : resolvedJobConfigs) { - jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId)); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId); + jobExecutionPlans.add(jobExecutionPlan); + templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString( + ConfigurationKeys.JOB_NAME_KEY)); } + updateJobDependencies(jobExecutionPlans, templateToJobNameMap); return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); } + + /** + * The job template name is derived from the {@link org.apache.gobblin.runtime.api.JobTemplate} URI. It is the + * simple name of the path component of the URI. + * @param jobExecutionPlan + * @return the simple name of the job template from the URI of its path. + */ + private static String getJobTemplateName(JobExecutionPlan jobExecutionPlan) { + Optional<URI> jobTemplateUri = jobExecutionPlan.getJobSpec().getTemplateURI(); + if (jobTemplateUri.isPresent()) { + return Files.getNameWithoutExtension(new Path(jobTemplateUri.get()).getName()); + } else { + return null; + } + } + + /** + * A method to modify the {@link ConfigurationKeys#JOB_DEPENDENCIES} specified in a {@link JobTemplate} to those + * which are usable in a {@link JobSpec}. + * The {@link ConfigurationKeys#JOB_DEPENDENCIES} specified in a JobTemplate use the JobTemplate names + * (i.e. the file names of the templates without the extension). However, the same {@link FlowTemplate} may be used + * across multiple {@link FlowEdge}s. To ensure that we capture dependencies between jobs correctly as Dags from + * successive hops are merged, we translate the {@link JobTemplate} name specified in the dependencies config to + * {@link ConfigurationKeys#JOB_NAME_KEY} from the corresponding {@link JobSpec}, which is guaranteed to be globally unique. + * For example, consider a {@link JobTemplate} with URI job1.job which has "job.dependencies=job2,job3" (where job2.job and job3.job are + * URIs of other {@link JobTemplate}s). Also, let the job.name config for the three jobs (after {@link JobSpec} is compiled) be as follows: + * "job.name=flowgrp1_flowName1_jobName1_1111", "job.name=flowgrp1_flowName1_jobName2_1121", and "job.name=flowgrp1_flowName1_jobName3_1131". Then, + * for job1, this method will set "job.dependencies=flowgrp1_flowName1_jobName2_1121, flowgrp1_flowName1_jobName3_1131". + * @param jobExecutionPlans a list of {@link JobExecutionPlan}s + * @param templateToJobNameMap a HashMap that has the mapping from the {@link JobTemplate} names to job.name in corresponding + * {@link JobSpec} + */ + private void updateJobDependencies(List<JobExecutionPlan> jobExecutionPlans, Map<String, String> templateToJobNameMap) { + for (JobExecutionPlan jobExecutionPlan: jobExecutionPlans) { + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + List<String> updatedDependenciesList = new ArrayList<>(); + if (jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)) { + for (String dependency : ConfigUtils.getStringList(jobSpec.getConfig(), ConfigurationKeys.JOB_DEPENDENCIES)) { + if (!templateToJobNameMap.containsKey(dependency)) { + //We should never hit this condition. The logic here is a safety check. + throw new RuntimeException("TemplateToJobNameMap does not contain dependency " + dependency); + } + updatedDependenciesList.add(templateToJobNameMap.get(dependency)); + } + String updatedDependencies = Joiner.on(",").join(updatedDependenciesList); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef(updatedDependencies))); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index 438bed0..023e104 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec; import java.net.URI; import java.net.URISyntaxException; +import java.util.Random; import org.apache.commons.lang3.StringUtils; @@ -49,6 +50,8 @@ public class JobExecutionPlan { private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN; public static class Factory { + public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_"; + private static final Random random = new Random(); public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId) throws URISyntaxException { @@ -69,8 +72,8 @@ public class JobExecutionPlan { String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, ""); String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); - //Modify the job name to include the flow group:flow name. - jobName = Joiner.on(":").join(flowGroup, flowName, jobName); + //Modify the job name to include the flow group, flow name and a randomly generated integer to make the job name unique. + jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, random.nextInt(Integer.MAX_VALUE)); JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig) .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion()); @@ -79,6 +82,9 @@ public class JobExecutionPlan { URI jobTemplateUri = new URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH)); JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build(); + //Add flowGroup to job spec + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))); + //Add flowName to job spec jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))); @@ -109,7 +115,7 @@ public class JobExecutionPlan { * A naive implementation of generating a jobSpec's URI within a multi-hop flow that follows the convention: * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link ConfigurationKeys#JOB_NAME_KEY}. */ - public static URI jobSpecURIGenerator(String jobGroup, String jobName, FlowSpec flowSpec) + private static URI jobSpecURIGenerator(String jobGroup, String jobName, FlowSpec flowSpec) throws URISyntaxException { return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(), StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"), "/") + jobGroup http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java index f942f8d..ed6f47a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java @@ -17,7 +17,6 @@ package org.apache.gobblin.service.modules.spec; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -25,10 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Optional; -import com.google.common.io.Files; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -49,7 +44,7 @@ public class JobExecutionPlanDagFactory { public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) { //Maintain a mapping between job name and the corresponding JobExecutionPlan. - Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new HashMap<>(); + Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap = new HashMap<>(); List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); /** * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node @@ -60,7 +55,7 @@ public class JobExecutionPlanDagFactory { dagNodeList.add(dagNode); String jobName = getJobName(jobExecutionPlan); if (jobName != null) { - JobExecutionPlanMap.put(jobName, dagNode); + jobExecutionPlanMap.put(jobName, dagNode); } } @@ -76,10 +71,10 @@ public class JobExecutionPlanDagFactory { if (jobName == null) { continue; } - Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName); + Dag.DagNode<JobExecutionPlan> node = jobExecutionPlanMap.get(jobName); Collection<String> dependencies = getDependencies(jobExecutionPlan.getJobSpec().getConfig()); for (String dependency : dependencies) { - Dag.DagNode<JobExecutionPlan> parentNode = JobExecutionPlanMap.get(dependency); + Dag.DagNode<JobExecutionPlan> parentNode = jobExecutionPlanMap.get(dependency); node.addParentNode(parentNode); } } @@ -98,17 +93,12 @@ public class JobExecutionPlanDagFactory { } /** - * The job name is derived from the {@link org.apache.gobblin.runtime.api.JobTemplate} URI. It is the - * simple name of the path component of the URI. + * The job name is derived from the {@link ConfigurationKeys#JOB_NAME_KEY} config. It is assumed to be unique + * across all jobs in a {@link Dag}. * @param jobExecutionPlan - * @return the simple name from the URI path. + * @return the name of the job. */ private static String getJobName(JobExecutionPlan jobExecutionPlan) { - Optional<URI> jobTemplateUri = jobExecutionPlan.getJobSpec().getTemplateURI(); - if (jobTemplateUri.isPresent()) { - return Files.getNameWithoutExtension(new Path(jobTemplateUri.get()).getName()); - } else { - return null; - } + return jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index 8574dac..c04b2b9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -47,6 +47,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.base.Charsets; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.io.Files; import com.typesafe.config.Config; @@ -184,7 +185,12 @@ public class MultiHopFlowCompilerTest { //Ensure the resolved job config for the first hop has the correct substitutions. Config jobConfig = jobSpec.getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String flowGroup = "testFlowGroup"; + String flowName = "testFlowName"; + String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName1.startsWith(expectedJobName1)); String from = jobConfig.getString("from"); String to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -211,7 +217,11 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); jobSpecWithExecutor = secondHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "convert-to-json-and-encrypt"); + String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName2.startsWith(expectedJobName2)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -227,7 +237,11 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); jobSpecWithExecutor = thirdHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName3.startsWith(expectedJobName3)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); @@ -247,7 +261,11 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); jobSpecWithExecutor = fourthHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-ADL"); + String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName4.startsWith(expectedJobName4)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); @@ -288,7 +306,12 @@ public class MultiHopFlowCompilerTest { //Ensure the resolved job config for the first hop has the correct substitutions. Config jobConfig = jobSpec.getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String flowGroup = "testFlowGroup"; + String flowName = "testFlowName"; + String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName1.startsWith(expectedJobName1)); String from = jobConfig.getString("from"); String to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -315,7 +338,11 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); jobExecutionPlan = secondHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "convert-to-json-and-encrypt"); + String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName2.startsWith(expectedJobName2)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -331,7 +358,11 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); jobExecutionPlan = thirdHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName3.startsWith(expectedJobName3)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); @@ -351,7 +382,12 @@ public class MultiHopFlowCompilerTest { Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); jobExecutionPlan = fourthHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + + String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, "Distcp-HDFS-ADL"); + String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobName4.startsWith(expectedJobName4)); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3); from = jobConfig.getString("from"); to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job index c26ade4..3c8f864 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job @@ -1,3 +1,2 @@ gobblin.template.uri="resource:///template_catalog/templates/job2.template" - -dependencies=job1 +job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job index cac20ed..44fa808 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job @@ -1,2 +1,2 @@ gobblin.template.uri="resource:///template_catalog/templates/job3.template" -dependencies=job1 +job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job index 9b86c77..5f3e1ff 100644 --- a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job +++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job @@ -1,2 +1,2 @@ gobblin.template.uri="resource:///template_catalog/templates/job4.template" -dependencies="job2,job3" +job.dependencies="job2,job3" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job1.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job1.template b/gobblin-service/src/test/resources/template_catalog/templates/job1.template index 321e984..8ce0f15 100644 --- a/gobblin-service/src/test/resources/template_catalog/templates/job1.template +++ b/gobblin-service/src/test/resources/template_catalog/templates/job1.template @@ -1,2 +1,3 @@ +job.name=job1 key11=val11 key12=val12 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job2.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job2.template b/gobblin-service/src/test/resources/template_catalog/templates/job2.template index 5141d92..eb8af51 100644 --- a/gobblin-service/src/test/resources/template_catalog/templates/job2.template +++ b/gobblin-service/src/test/resources/template_catalog/templates/job2.template @@ -1,2 +1,3 @@ +job.name=job2 key21=val21 key22=val22 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job3.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job3.template b/gobblin-service/src/test/resources/template_catalog/templates/job3.template index c192cc4..e932f5c 100644 --- a/gobblin-service/src/test/resources/template_catalog/templates/job3.template +++ b/gobblin-service/src/test/resources/template_catalog/templates/job3.template @@ -1,2 +1,3 @@ +job.name=job3 key31=val31 key32=val32 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job4.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job4.template b/gobblin-service/src/test/resources/template_catalog/templates/job4.template index a6a508e..8cf843a 100644 --- a/gobblin-service/src/test/resources/template_catalog/templates/job4.template +++ b/gobblin-service/src/test/resources/template_catalog/templates/job4.template @@ -1,2 +1,3 @@ +job.name=job4 key41=val41 key42=val42 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job index c4db05f..93c0fe6 100644 --- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job +++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job @@ -1,3 +1,2 @@ gobblin.template.uri=resource:///template_catalog/templates/job2.template - -dependencies=job1 +job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job index 59867b3..7f29c0a 100644 --- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job +++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job @@ -1,2 +1,2 @@ gobblin.template.uri=resource:///template_catalog/templates/job3.template -dependencies=job1 +job.dependencies=job1 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job index 8fdc611..c80b296 100644 --- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job +++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job @@ -1,2 +1,2 @@ gobblin.template.uri=resource:///template_catalog/templates/job4.template -dependencies=job2,job3 +job.dependencies=job2,job3
