Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b9149bfd9 -> 2509f3a3d


[GOBBLIN-604] Map dependencies in job templates to the job names in compiled 
JobSpecs.

Closes #2470 from sv2000/dependencies


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2509f3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2509f3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2509f3a3

Branch: refs/heads/master
Commit: 2509f3a3d43ecc35caebe6e4b5ce283b035867b4
Parents: b9149bf
Author: sv2000 <[email protected]>
Authored: Wed Oct 10 21:09:49 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed Oct 10 21:09:49 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  4 +-
 .../service/modules/flow/FlowGraphPath.java     | 99 +++++++++++++++++++-
 .../service/modules/spec/JobExecutionPlan.java  | 12 ++-
 .../spec/JobExecutionPlanDagFactory.java        | 26 ++---
 .../modules/flow/MultiHopFlowCompilerTest.java  | 52 ++++++++--
 .../flowEdgeTemplate/jobs/job2.job              |  3 +-
 .../flowEdgeTemplate/jobs/job3.job              |  2 +-
 .../flowEdgeTemplate/jobs/job4.job              |  2 +-
 .../template_catalog/templates/job1.template    |  1 +
 .../template_catalog/templates/job2.template    |  1 +
 .../template_catalog/templates/job3.template    |  1 +
 .../template_catalog/templates/job4.template    |  1 +
 .../test-template/jobs/job2.job                 |  3 +-
 .../test-template/jobs/job3.job                 |  2 +-
 .../test-template/jobs/job4.job                 |  2 +-
 15 files changed, 168 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f2f58f1..b624511 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -175,6 +175,7 @@ public class ConfigurationKeys {
   public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = 
"workunit.creation.and.run.interval";
   public static final String WORK_UNIT_ENABLE_TRACKING_LOGS = 
"workunit.enableTrackingLogs";
 
+  public static final String JOB_DEPENDENCIES = "job.dependencies";
   public static final String JOB_RUN_ONCE_KEY = "job.runonce";
   public static final String JOB_DISABLED_KEY = "job.disabled";
   public static final String JOB_JAR_FILES_KEY = "job.jars";
@@ -207,10 +208,9 @@ public class ConfigurationKeys {
   public static final String JOB_TEMPLATE_PATH = "job.template";
 
   /**
-   * Configuration property used only for job configuration file's tempalte, 
inside .template file
+   * Configuration property used only for job configuration file's template
    */
   public static final String REQUIRED_ATRRIBUTES_LIST = 
"gobblin.template.required_attributes";
-  public static final String JOB_DEPENDENCIES = "dependencies";
 
   /**
    * Configuration for emitting job events

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index 4b81f1f..0c28c3b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -17,16 +17,29 @@
 
 package org.apache.gobblin.service.modules.flow;
 
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -36,6 +49,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -62,12 +76,12 @@ public class FlowGraphPath {
   public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
     Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
 
-    for(List<FlowEdgeContext> path: paths) {
+    for (List<FlowEdgeContext> path: paths) {
       Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>());
       Iterator<FlowEdgeContext> pathIterator = path.iterator();
       while (pathIterator.hasNext()) {
         Dag<JobExecutionPlan> flowEdgeDag = 
convertHopToDag(pathIterator.next());
-        pathDag = pathDag.concatenate(flowEdgeDag);
+        pathDag = concatenate(pathDag, flowEdgeDag);
       }
       flowDag = flowDag.merge(pathDag);
     }
@@ -75,12 +89,35 @@ public class FlowGraphPath {
   }
 
   /**
+   * Concatenate two {@link Dag}s. Modify the {@link 
ConfigurationKeys#JOB_DEPENDENCIES} in the {@link JobSpec}s of the child
+   * {@link Dag} to reflect the concatenation operation.
+   * @param dagLeft The parent dag.
+   * @param dagRight The child dag.
+   * @return The concatenated dag with modified {@link 
ConfigurationKeys#JOB_DEPENDENCIES}.
+   */
+  private Dag<JobExecutionPlan> concatenate(Dag<JobExecutionPlan> dagLeft, 
Dag<JobExecutionPlan> dagRight) {
+    List<Dag.DagNode<JobExecutionPlan>> endNodes = dagLeft.getEndNodes();
+    List<Dag.DagNode<JobExecutionPlan>> startNodes = dagRight.getStartNodes();
+    List<String> dependenciesList = Lists.newArrayList();
+    for (Dag.DagNode<JobExecutionPlan> dagNode: endNodes) {
+      
dependenciesList.add(dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+    }
+    String dependencies = Joiner.on(",").join(dependenciesList);
+
+    for (Dag.DagNode<JobExecutionPlan> childNode: startNodes) {
+      JobSpec jobSpec = childNode.getValue().getJobSpec();
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES,
 ConfigValueFactory.fromAnyRef(dependencies)));
+    }
+
+    return dagLeft.concatenate(dagRight);
+  }
+  /**
    * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < 
JobExecutionPlan >} that moves data
    * from the source of the {@link FlowEdge} to the destination of the {@link 
FlowEdge}.
    * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
    * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the 
{@link FlowEdge}.
    */
-  private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext 
flowEdgeContext)
+   private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext 
flowEdgeContext)
       throws SpecNotFoundException, JobTemplate.TemplateException, 
URISyntaxException {
     FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
     DatasetDescriptor inputDatasetDescriptor = 
flowEdgeContext.getInputDatasetDescriptor();
@@ -89,13 +126,67 @@ public class FlowGraphPath {
     SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
 
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    Map<String, String> templateToJobNameMap = new HashMap<>();
 
     //Get resolved job configs from the flow template
     List<Config> resolvedJobConfigs = 
flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, 
outputDatasetDescriptor);
     //Iterate over each resolved job config and convert the config to a 
JobSpec.
     for (Config resolvedJobConfig : resolvedJobConfigs) {
-      jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, 
specExecutor, flowExecutionId));
+      JobExecutionPlan jobExecutionPlan = new 
JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, 
specExecutor, flowExecutionId);
+      jobExecutionPlans.add(jobExecutionPlan);
+      templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), 
jobExecutionPlan.getJobSpec().getConfig().getString(
+          ConfigurationKeys.JOB_NAME_KEY));
     }
+    updateJobDependencies(jobExecutionPlans, templateToJobNameMap);
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
+
+  /**
+   * The job template name is derived from the {@link 
org.apache.gobblin.runtime.api.JobTemplate} URI. It is the
+   * simple name of the path component of the URI.
+   * @param jobExecutionPlan
+   * @return the simple name of the job template from the URI of its path.
+   */
+  private static String getJobTemplateName(JobExecutionPlan jobExecutionPlan) {
+    Optional<URI> jobTemplateUri = 
jobExecutionPlan.getJobSpec().getTemplateURI();
+    if (jobTemplateUri.isPresent()) {
+      return Files.getNameWithoutExtension(new 
Path(jobTemplateUri.get()).getName());
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * A method to modify the {@link ConfigurationKeys#JOB_DEPENDENCIES} 
specified in a {@link JobTemplate} to those
+   * which are usable in a {@link JobSpec}.
+   * The {@link ConfigurationKeys#JOB_DEPENDENCIES} specified in a JobTemplate 
use the JobTemplate names
+   * (i.e. the file names of the templates without the extension). However, 
the same {@link FlowTemplate} may be used
+   * across multiple {@link FlowEdge}s. To ensure that we capture dependencies 
between jobs correctly as Dags from
+   * successive hops are merged, we translate the {@link JobTemplate} name 
specified in the dependencies config to
+   * {@link ConfigurationKeys#JOB_NAME_KEY} from the corresponding {@link 
JobSpec}, which is guaranteed to be globally unique.
+   * For example, consider a {@link JobTemplate} with URI job1.job which has 
"job.dependencies=job2,job3" (where job2.job and job3.job are
+   * URIs of other {@link JobTemplate}s). Also, let the job.name config for 
the three jobs (after {@link JobSpec} is compiled) be as follows:
+   *  "job.name=flowgrp1_flowName1_jobName1_1111", 
"job.name=flowgrp1_flowName1_jobName2_1121", and 
"job.name=flowgrp1_flowName1_jobName3_1131". Then,
+   *  for job1, this method will set 
"job.dependencies=flowgrp1_flowName1_jobName2_1121, 
flowgrp1_flowName1_jobName3_1131".
+   * @param jobExecutionPlans a list of {@link JobExecutionPlan}s
+   * @param templateToJobNameMap a HashMap that has the mapping from the 
{@link JobTemplate} names to job.name in corresponding
+   * {@link JobSpec}
+   */
+  private void updateJobDependencies(List<JobExecutionPlan> jobExecutionPlans, 
Map<String, String> templateToJobNameMap) {
+    for (JobExecutionPlan jobExecutionPlan: jobExecutionPlans) {
+      JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+      List<String> updatedDependenciesList = new ArrayList<>();
+      if (jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)) {
+        for (String dependency : 
ConfigUtils.getStringList(jobSpec.getConfig(), 
ConfigurationKeys.JOB_DEPENDENCIES)) {
+          if (!templateToJobNameMap.containsKey(dependency)) {
+            //We should never hit this condition. The logic here is a safety 
check.
+            throw new RuntimeException("TemplateToJobNameMap does not contain 
dependency " + dependency);
+          }
+          updatedDependenciesList.add(templateToJobNameMap.get(dependency));
+        }
+        String updatedDependencies = 
Joiner.on(",").join(updatedDependenciesList);
+        
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES,
 ConfigValueFactory.fromAnyRef(updatedDependencies)));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 438bed0..023e104 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Random;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -49,6 +50,8 @@ public class JobExecutionPlan {
   private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
 
   public static class Factory {
+    public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
+    private static final Random random = new Random();
 
     public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, 
SpecExecutor specExecutor, Long flowExecutionId)
         throws URISyntaxException {
@@ -69,8 +72,8 @@ public class JobExecutionPlan {
       String flowGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
 
-      //Modify the job name to include the flow group:flow name.
-      jobName = Joiner.on(":").join(flowGroup, flowName, jobName);
+      //Modify the job name to include the flow group, flow name and a 
randomly generated integer to make the job name unique.
+      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, random.nextInt(Integer.MAX_VALUE));
 
       JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
           
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
@@ -79,6 +82,9 @@ public class JobExecutionPlan {
       URI jobTemplateUri = new 
URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
       JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
 
+      //Add flowGroup to job spec
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
 ConfigValueFactory.fromAnyRef(flowGroup)));
+
       //Add flowName to job spec
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY,
 ConfigValueFactory.fromAnyRef(flowName)));
 
@@ -109,7 +115,7 @@ public class JobExecutionPlan {
      * A naive implementation of generating a jobSpec's URI within a multi-hop 
flow that follows the convention:
      * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link 
ConfigurationKeys#JOB_NAME_KEY}.
      */
-    public static URI jobSpecURIGenerator(String jobGroup, String jobName, 
FlowSpec flowSpec)
+    private static URI jobSpecURIGenerator(String jobGroup, String jobName, 
FlowSpec flowSpec)
         throws URISyntaxException {
       return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, 
flowSpec.getUri().getAuthority(),
           
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(),
 "/"), "/") + jobGroup

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index f942f8d..ed6f47a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.spec;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -25,10 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-import com.google.common.io.Files;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
@@ -49,7 +44,7 @@ public class JobExecutionPlanDagFactory {
 
   public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> 
jobExecutionPlans) {
     //Maintain a mapping between job name and the corresponding 
JobExecutionPlan.
-    Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new 
HashMap<>();
+    Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap = new 
HashMap<>();
     List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
     /**
      * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link 
JobSpec} in the flow. Add this node
@@ -60,7 +55,7 @@ public class JobExecutionPlanDagFactory {
       dagNodeList.add(dagNode);
       String jobName = getJobName(jobExecutionPlan);
       if (jobName != null) {
-        JobExecutionPlanMap.put(jobName, dagNode);
+        jobExecutionPlanMap.put(jobName, dagNode);
       }
     }
 
@@ -76,10 +71,10 @@ public class JobExecutionPlanDagFactory {
       if (jobName == null) {
         continue;
       }
-      Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName);
+      Dag.DagNode<JobExecutionPlan> node = jobExecutionPlanMap.get(jobName);
       Collection<String> dependencies = 
getDependencies(jobExecutionPlan.getJobSpec().getConfig());
       for (String dependency : dependencies) {
-        Dag.DagNode<JobExecutionPlan> parentNode = 
JobExecutionPlanMap.get(dependency);
+        Dag.DagNode<JobExecutionPlan> parentNode = 
jobExecutionPlanMap.get(dependency);
         node.addParentNode(parentNode);
       }
     }
@@ -98,17 +93,12 @@ public class JobExecutionPlanDagFactory {
   }
 
   /**
-   * The job name is derived from the {@link 
org.apache.gobblin.runtime.api.JobTemplate} URI. It is the
-   * simple name of the path component of the URI.
+   * The job name is derived from the {@link ConfigurationKeys#JOB_NAME_KEY} 
config. It is assumed to be unique
+   * across all jobs in a {@link Dag}.
    * @param jobExecutionPlan
-   * @return the simple name from the URI path.
+   * @return the name of the job.
    */
   private static String getJobName(JobExecutionPlan jobExecutionPlan) {
-    Optional<URI> jobTemplateUri = 
jobExecutionPlan.getJobSpec().getTemplateURI();
-    if (jobTemplateUri.isPresent()) {
-      return Files.getNameWithoutExtension(new 
Path(jobTemplateUri.get()).getName());
-    } else {
-      return null;
-    }
+    return 
jobExecutionPlan.getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 8574dac..c04b2b9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -47,6 +47,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
@@ -184,7 +185,12 @@ public class MultiHopFlowCompilerTest {
 
     //Ensure the resolved job config for the first hop has the correct 
substitutions.
     Config jobConfig = jobSpec.getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String flowGroup = "testFlowGroup";
+    String flowName = "testFlowName";
+    String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+    String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -211,7 +217,11 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
     jobSpecWithExecutor = secondHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "convert-to-json-and-encrypt");
+    String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -227,7 +237,11 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
     jobSpecWithExecutor = thirdHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+    String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
@@ -247,7 +261,11 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
     jobSpecWithExecutor = fourthHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-ADL");
+    String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
@@ -288,7 +306,12 @@ public class MultiHopFlowCompilerTest {
 
     //Ensure the resolved job config for the first hop has the correct 
substitutions.
     Config jobConfig = jobSpec.getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String flowGroup = "testFlowGroup";
+    String flowName = "testFlowName";
+    String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+    String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -315,7 +338,11 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
     jobExecutionPlan = secondHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "convert-to-json-and-encrypt");
+    String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -331,7 +358,11 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
     jobExecutionPlan = thirdHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+    String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
@@ -351,7 +382,12 @@ public class MultiHopFlowCompilerTest {
     Dag.DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
     jobExecutionPlan = fourthHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), 
"testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+
+    String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join(flowGroup, flowName, "Distcp-HDFS-ADL");
+    String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
+    
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
index c26ade4..3c8f864 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
@@ -1,3 +1,2 @@
 gobblin.template.uri="resource:///template_catalog/templates/job2.template"
-
-dependencies=job1
+job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
index cac20ed..44fa808 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
@@ -1,2 +1,2 @@
 gobblin.template.uri="resource:///template_catalog/templates/job3.template"
-dependencies=job1
+job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
index 9b86c77..5f3e1ff 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
@@ -1,2 +1,2 @@
 gobblin.template.uri="resource:///template_catalog/templates/job4.template"
-dependencies="job2,job3"
+job.dependencies="job2,job3"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job1.template
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/templates/job1.template 
b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
index 321e984..8ce0f15 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/templates/job1.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
@@ -1,2 +1,3 @@
+job.name=job1
 key11=val11
 key12=val12
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job2.template
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/templates/job2.template 
b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
index 5141d92..eb8af51 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/templates/job2.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
@@ -1,2 +1,3 @@
+job.name=job2
 key21=val21
 key22=val22
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job3.template
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/templates/job3.template 
b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
index c192cc4..e932f5c 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/templates/job3.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
@@ -1,2 +1,3 @@
+job.name=job3
 key31=val31
 key32=val32
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/templates/job4.template
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/templates/job4.template 
b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
index a6a508e..8cf843a 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/templates/job4.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
@@ -1,2 +1,3 @@
+job.name=job4
 key41=val41
 key42=val42
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
index c4db05f..93c0fe6 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
@@ -1,3 +1,2 @@
 gobblin.template.uri=resource:///template_catalog/templates/job2.template
-
-dependencies=job1
+job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
index 59867b3..7f29c0a 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
@@ -1,2 +1,2 @@
 gobblin.template.uri=resource:///template_catalog/templates/job3.template
-dependencies=job1
+job.dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2509f3a3/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
index 8fdc611..c80b296 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
+++ 
b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
@@ -1,2 +1,2 @@
 gobblin.template.uri=resource:///template_catalog/templates/job4.template
-dependencies=job2,job3
+job.dependencies=job2,job3

Reply via email to