This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 86515b9  [GOBBLIN-810] Include flow edge ID in job name
86515b9 is described below

commit 86515b99fac8d4dd12ca99c79674f0faf572736f
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jul 15 10:53:15 2019 -0700

    [GOBBLIN-810] Include flow edge ID in job name
    
    Closes #2675 from jack-moseley/job-name-conflict
---
 .../service/modules/core/GitFlowGraphMonitor.java  |  2 +-
 .../service/modules/spec/JobExecutionPlan.java     |  7 ++--
 .../modules/core/GitFlowGraphMonitorTest.java      |  2 +-
 .../modules/flow/MultiHopFlowCompilerTest.java     | 38 +++++++++--------
 .../spec/JobExecutionPlanDagFactoryTest.java       | 49 ++++++++++++++++++++++
 .../adls-1-to-adls-1-retention-1.properties        |  2 +-
 .../adls-1-to-adls-1-retention-2.properties        |  2 +-
 .../flowedges/hdfs-1-to-hdfs-1-encrypt.properties  |  2 +-
 .../hdfs-1-to-hdfs-1-retention.properties          |  2 +-
 .../flowedges/hdfs-1-to-hdfs-3.properties          |  2 +-
 .../flowedges/hdfs-2-hdfs-2-retention.properties   |  2 +-
 .../flowedges/hdfs-2-to-hdfs-2-encrypt.properties  |  2 +-
 .../flowedges/hdfs-2-to-hdfs-4.properties          |  2 +-
 .../flowedges/hdfs-3-to-adls-1.properties          |  2 +-
 .../hdfs-3-to-hdfs-3-retention.properties          |  2 +-
 .../flowedges/hdfs-4-to-adls-1.properties          |  2 +-
 .../hdfs-4-to-hdfs-4-retention.properties          |  2 +-
 .../flowgraph/flowedges/local-to-hdfs-1.properties |  2 +-
 .../flowgraph/flowedges/local-to-hdfs-2.properties |  2 +-
 .../flowedges/local-to-local-retention.properties  |  2 +-
 20 files changed, 89 insertions(+), 39 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 5a69371..3d5a7bf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -68,7 +68,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService 
{
 
   private static final String PROPERTIES_EXTENSIONS = "properties";
   private static final String CONF_EXTENSIONS = StringUtils.EMPTY;
-  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = 
"master";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index bb7955f..d0ec901 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -85,11 +85,10 @@ public class JobExecutionPlan {
       String flowFailureOption = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
 
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
-      String source = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
-      String destination = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "");
+      String edgeId = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
 
-      //Modify the job name to include the flow group, flow name and source 
and destination node ids for the job.
-      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, source, destination);
+      //Modify the job name to include the flow group, flow name and edge id.
+      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, edgeId);
 
       JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
           
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 6a02652..c07b56e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -342,7 +342,7 @@ public class GitFlowGraphMonitorTest {
     Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1);
     Assert.assertEquals(edgeSet.size(), 1);
     FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getId(), Joiner.on(":").join(node1, node2, 
edgeName));
+    Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2, 
edgeName));
     Assert.assertEquals(flowEdge.getSrc(), node1);
     Assert.assertEquals(flowEdge.getDest(), node2);
     
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 4bffdb1..c731714 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -85,6 +84,7 @@ import 
org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -244,7 +244,7 @@ public class MultiHopFlowCompilerTest {
     String flowGroup = "testFlowGroup";
     String flowName = "testFlowName";
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1");
+        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1", 
"localToHdfs");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName1, expectedJobName1);
     String from = jobConfig.getString("from");
@@ -274,7 +274,7 @@ public class MultiHopFlowCompilerTest {
     jobSpecWithExecutor = secondHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", 
"HDFS-1");
+        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", 
"HDFS-1", "hdfsConvertToJsonAndEncrypt");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName2, expectedJobName2);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
@@ -294,7 +294,7 @@ public class MultiHopFlowCompilerTest {
     jobSpecWithExecutor = thirdHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3");
+        join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName3, expectedJobName3);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
@@ -318,7 +318,7 @@ public class MultiHopFlowCompilerTest {
     jobSpecWithExecutor = fourthHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1");
+        join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1", 
"hdfsToAdl");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName4, expectedJobName4);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
@@ -362,6 +362,8 @@ public class MultiHopFlowCompilerTest {
         "Distcp", "SnapshotRetention", "DistcpToADL", "SnapshotRetention");
     List<String> sourceNodes = Lists.newArrayList("LocalFS-1", "LocalFS-1", 
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1");
     List<String> destinationNodes = Lists.newArrayList("LocalFS-1", "HDFS-1", 
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1", "ADLS-1");
+    List<String> edgeNames = Lists.newArrayList("localRetention", 
"localToHdfs", "hdfsRetention",
+        "hdfsConvertToJsonAndEncrypt", "hdfsRetention", "hdfsToHdfs", 
"hdfsRetention", "hdfsToAdl", "hdfsRemoteRetention");
 
     List<DagNode<JobExecutionPlan>> nextHopNodes = new ArrayList<>();
     for (int i = 0; i < 9; i += 2) {
@@ -372,10 +374,10 @@ public class MultiHopFlowCompilerTest {
       }
       Set<String> jobNames = new HashSet<>();
       
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-          join(flowGroup, flowName, expectedJobNames.get(i), 
sourceNodes.get(i), destinationNodes.get(i)));
+          join(flowGroup, flowName, expectedJobNames.get(i), 
sourceNodes.get(i), destinationNodes.get(i), edgeNames.get(i)));
       if (i < 8) {
         
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-            join(flowGroup, flowName, expectedJobNames.get(i + 1), 
sourceNodes.get(i + 1), destinationNodes.get(i + 1)));
+            join(flowGroup, flowName, expectedJobNames.get(i + 1), 
sourceNodes.get(i + 1), destinationNodes.get(i + 1), edgeNames.get(i + 1)));
       }
 
       for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) {
@@ -396,7 +398,7 @@ public class MultiHopFlowCompilerTest {
   @Test (dependsOnMethods = "testCompileFlowWithRetention")
   public void testCompileFlowAfterFirstEdgeDeletion() throws 
URISyntaxException, IOException {
     //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+    this.flowGraph.deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
 
     FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -415,7 +417,7 @@ public class MultiHopFlowCompilerTest {
     String flowGroup = "testFlowGroup";
     String flowName = "testFlowName";
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2");
+        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2", 
"localToHdfs");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName1, expectedJobName1);
     String from = jobConfig.getString("from");
@@ -445,7 +447,7 @@ public class MultiHopFlowCompilerTest {
     jobExecutionPlan = secondHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2", 
"HDFS-2");
+        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2", 
"HDFS-2", "hdfsConvertToJsonAndEncrypt");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName2, expectedJobName2);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
@@ -465,7 +467,7 @@ public class MultiHopFlowCompilerTest {
     jobExecutionPlan = thirdHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4");
+        join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4", "hdfsToHdfs");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName3, expectedJobName3);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
@@ -490,7 +492,7 @@ public class MultiHopFlowCompilerTest {
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
 
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1");
+        join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1", 
"hdfsToAdl");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName4, expectedJobName4);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
@@ -518,7 +520,7 @@ public class MultiHopFlowCompilerTest {
   @Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
   public void testCompileFlowAfterSecondEdgeDeletion() throws 
URISyntaxException, IOException {
     //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+    this.flowGraph.deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
 
     FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -540,7 +542,7 @@ public class MultiHopFlowCompilerTest {
     DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     String expectedJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3");
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3", 
"hdfsToHdfs");
     String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertEquals(jobName, expectedJobName);
   }
@@ -558,9 +560,9 @@ public class MultiHopFlowCompilerTest {
     //First hop must be from LocalFS to HDFS-1 and HDFS-2
     Set<String> jobNames = new HashSet<>();
     
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", 
"HDFS-1"));
+        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-1", 
"localToHdfs"));
     
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", 
"HDFS-2"));
+        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-2", 
"localToHdfs"));
 
     for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
       Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
@@ -571,9 +573,9 @@ public class MultiHopFlowCompilerTest {
     //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
     jobNames = new HashSet<>();
     
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3"));
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3", 
"hdfsToHdfs"));
     
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4"));
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4", 
"hdfsToHdfs"));
     for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
       List<DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode);
       Assert.assertEquals(nextNodes.size(), 1);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 4ffdb67..d29ed74 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -34,8 +35,11 @@ import org.testng.annotations.Test;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -43,6 +47,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
 import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
@@ -113,4 +118,48 @@ public class JobExecutionPlanDagFactoryTest {
     Assert.assertTrue(nodeSet.contains("job2"));
     Assert.assertTrue(nodeSet.contains("job3"));
   }
+
+  @Test
+  public void testCreateDagWithDuplicateJobNames() throws Exception {
+    Config flowConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+    Config flowConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+    Config flowConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+    List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, 
flowConfig3);
+
+    Config jobConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName1").build();
+    Config jobConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName2").build();
+    Config jobConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName3").build();
+    List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, 
jobConfig3);
+
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 3; i++) {
+      Config jobConfig = jobConfigs.get(i);
+      if (i > 0) {
+        String previousJobName = 
jobExecutionPlans.get(jobExecutionPlans.size() - 
1).getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef(previousJobName));
+      }
+
+      FlowSpec flowSpec = 
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+      jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec, 
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+          ConfigValueFactory.fromAnyRef("testUri")), null, 0L, 
ConfigFactory.empty()));
+    }
+
+    Dag<JobExecutionPlan> dag = new 
JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+    Assert.assertEquals(dag.getStartNodes().size(), 1);
+    Assert.assertEquals(dag.getEndNodes().size(), 1);
+    Assert.assertEquals(dag.getNodes().size(), 3);
+    Assert.assertNull(dag.getNodes().get(0).getParentNodes());
+    Assert.assertEquals(dag.getNodes().get(1).getParentNodes().size(), 1);
+    Assert.assertEquals(dag.getNodes().get(2).getParentNodes().size(), 1);
+    Assert.assertEquals(dag.getNodes().get(1).getParentNodes().get(0), 
dag.getNodes().get(0));
+    Assert.assertEquals(dag.getNodes().get(2).getParentNodes().get(0), 
dag.getNodes().get(1));
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
index b01722c..116376c 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
@@ -1,5 +1,5 @@
 flow.edge.source=ADLS-1
 flow.edge.destination=ADLS-1
-flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.id=ADLS-1_ADLS-1_hdfsRemoteRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban03
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
index def8cb9..b5bd122 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
@@ -1,5 +1,5 @@
 flow.edge.source=ADLS-1
 flow.edge.destination=ADLS-1
-flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.id=ADLS-1_ADLS-1_hdfsRemoteRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban04
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index 110c665..c49899c 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-1
-flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.id=HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
 flow.edge.specExecutors=azkaban01
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
index 8e5b5ef..596e66d 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-1
-flow.edge.id=HDFS-1:HDFS-1:hdfsRetention
+flow.edge.id=HDFS-1_HDFS-1_hdfsRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban01
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 5abd015..7e4fde1 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -1,6 +1,6 @@
 flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-3
-flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.id=HDFS-1_HDFS-3_hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
 flow.edge.specExecutors=azkaban01
 
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
index 8b54a22..082b639 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-2
-flow.edge.id=HDFS-2:HDFS-2:hdfsRetention
+flow.edge.id=HDFS-2_HDFS-2_hdfsRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban02
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index 24339ec..d104e37 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-2
-flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.id=HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
 flow.edge.specExecutors=azkaban02
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 268a414..6b76a7f 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-4
-flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.id=HDFS-2_HDFS-4_hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
 flow.edge.specExecutors=azkaban02
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
index 3471277..656248a 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -1,6 +1,6 @@
 flow.edge.source=HDFS-3
 flow.edge.destination=ADLS-1
-flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.id=HDFS-3_ADLS-1_hdfsToAdl
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
 flow.edge.specExecutors=azkaban03
 # Proxy config
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
index f1d1adf..ecc6d4e 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-3
 flow.edge.destination=HDFS-3
-flow.edge.id=HDFS-3:HDFS-3:hdfsRetention
+flow.edge.id=HDFS-3_HDFS-3_hdfsRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban03
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
index 9a08893..08f6209 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -1,6 +1,6 @@
 flow.edge.source=HDFS-4
 flow.edge.destination=ADLS-1
-flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.id=HDFS-4_ADLS-1_hdfsToAdl
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
 flow.edge.specExecutors=azkaban04
 
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
index 0a005c8..fbb5a0e 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
@@ -1,5 +1,5 @@
 flow.edge.source=HDFS-4
 flow.edge.destination=HDFS-4
-flow.edge.id=HDFS-4:HDFS-4:hdfsRetention
+flow.edge.id=HDFS-4_HDFS-4_hdfsRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=azkaban04
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
index 8a3809e..5a4d67a 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -1,5 +1,5 @@
 flow.edge.source=LocalFS-1
 flow.edge.destination=HDFS-1
-flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.id=LocalFS-1_HDFS-1_localToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
 flow.edge.specExecutors=local01
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
index 626a12f..f4cd2cf 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -1,5 +1,5 @@
 flow.edge.source=LocalFS-1
 flow.edge.destination=HDFS-2
-flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.id=LocalFS-1_HDFS-2_localToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
 flow.edge.specExecutors=local01
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
index c1f17eb..d842471 100644
--- 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
@@ -1,5 +1,5 @@
 flow.edge.source=LocalFS-1
 flow.edge.destination=LocalFS-1
-flow.edge.id=LocalFS-1:LocalFS-1:localRetention
+flow.edge.id=LocalFS-1_LocalFS-1_localRetention
 
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
 flow.edge.specExecutors=local01
\ No newline at end of file

Reply via email to