This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 86515b9 [GOBBLIN-810] Include flow edge ID in job name
86515b9 is described below
commit 86515b99fac8d4dd12ca99c79674f0faf572736f
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jul 15 10:53:15 2019 -0700
[GOBBLIN-810] Include flow edge ID in job name
Closes #2675 from jack-moseley/job-name-conflict
---
.../service/modules/core/GitFlowGraphMonitor.java | 2 +-
.../service/modules/spec/JobExecutionPlan.java | 7 ++--
.../modules/core/GitFlowGraphMonitorTest.java | 2 +-
.../modules/flow/MultiHopFlowCompilerTest.java | 38 +++++++++--------
.../spec/JobExecutionPlanDagFactoryTest.java | 49 ++++++++++++++++++++++
.../adls-1-to-adls-1-retention-1.properties | 2 +-
.../adls-1-to-adls-1-retention-2.properties | 2 +-
.../flowedges/hdfs-1-to-hdfs-1-encrypt.properties | 2 +-
.../hdfs-1-to-hdfs-1-retention.properties | 2 +-
.../flowedges/hdfs-1-to-hdfs-3.properties | 2 +-
.../flowedges/hdfs-2-hdfs-2-retention.properties | 2 +-
.../flowedges/hdfs-2-to-hdfs-2-encrypt.properties | 2 +-
.../flowedges/hdfs-2-to-hdfs-4.properties | 2 +-
.../flowedges/hdfs-3-to-adls-1.properties | 2 +-
.../hdfs-3-to-hdfs-3-retention.properties | 2 +-
.../flowedges/hdfs-4-to-adls-1.properties | 2 +-
.../hdfs-4-to-hdfs-4-retention.properties | 2 +-
.../flowgraph/flowedges/local-to-hdfs-1.properties | 2 +-
.../flowgraph/flowedges/local-to-hdfs-2.properties | 2 +-
.../flowedges/local-to-local-retention.properties | 2 +-
20 files changed, 89 insertions(+), 39 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 5a69371..3d5a7bf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -68,7 +68,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService
{
private static final String PROPERTIES_EXTENSIONS = "properties";
private static final String CONF_EXTENSIONS = StringUtils.EMPTY;
- private static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+ private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME =
"master";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index bb7955f..d0ec901 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -85,11 +85,10 @@ public class JobExecutionPlan {
String flowFailureOption = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
- String source = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
- String destination = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "");
+ String edgeId = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
- //Modify the job name to include the flow group, flow name and source
and destination node ids for the job.
- jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, source, destination);
+ //Modify the job name to include the flow group, flow name and edge id.
+ jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId);
JobSpec.Builder jobSpecBuilder =
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName,
flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 6a02652..c07b56e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -342,7 +342,7 @@ public class GitFlowGraphMonitorTest {
Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1);
Assert.assertEquals(edgeSet.size(), 1);
FlowEdge flowEdge = edgeSet.iterator().next();
- Assert.assertEquals(flowEdge.getId(), Joiner.on(":").join(node1, node2,
edgeName));
+ Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2,
edgeName));
Assert.assertEquals(flowEdge.getSrc(), node1);
Assert.assertEquals(flowEdge.getDest(), node2);
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
"/tmp1");
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 4bffdb1..c731714 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -85,6 +84,7 @@ import
org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -244,7 +244,7 @@ public class MultiHopFlowCompilerTest {
String flowGroup = "testFlowGroup";
String flowName = "testFlowName";
String expectedJobName1 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1");
+ join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1",
"localToHdfs");
String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName1, expectedJobName1);
String from = jobConfig.getString("from");
@@ -274,7 +274,7 @@ public class MultiHopFlowCompilerTest {
jobSpecWithExecutor = secondHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName2 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1",
"HDFS-1");
+ join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1",
"HDFS-1", "hdfsConvertToJsonAndEncrypt");
String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName2, expectedJobName2);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName1);
@@ -294,7 +294,7 @@ public class MultiHopFlowCompilerTest {
jobSpecWithExecutor = thirdHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName3 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3");
+ join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs");
String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName3, expectedJobName3);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName2);
@@ -318,7 +318,7 @@ public class MultiHopFlowCompilerTest {
jobSpecWithExecutor = fourthHopNode.getValue();
jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
String expectedJobName4 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1");
+ join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1",
"hdfsToAdl");
String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName4, expectedJobName4);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName3);
@@ -362,6 +362,8 @@ public class MultiHopFlowCompilerTest {
"Distcp", "SnapshotRetention", "DistcpToADL", "SnapshotRetention");
List<String> sourceNodes = Lists.newArrayList("LocalFS-1", "LocalFS-1",
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1");
List<String> destinationNodes = Lists.newArrayList("LocalFS-1", "HDFS-1",
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1", "ADLS-1");
+ List<String> edgeNames = Lists.newArrayList("localRetention",
"localToHdfs", "hdfsRetention",
+ "hdfsConvertToJsonAndEncrypt", "hdfsRetention", "hdfsToHdfs",
"hdfsRetention", "hdfsToAdl", "hdfsRemoteRetention");
List<DagNode<JobExecutionPlan>> nextHopNodes = new ArrayList<>();
for (int i = 0; i < 9; i += 2) {
@@ -372,10 +374,10 @@ public class MultiHopFlowCompilerTest {
}
Set<String> jobNames = new HashSet<>();
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, expectedJobNames.get(i),
sourceNodes.get(i), destinationNodes.get(i)));
+ join(flowGroup, flowName, expectedJobNames.get(i),
sourceNodes.get(i), destinationNodes.get(i), edgeNames.get(i)));
if (i < 8) {
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, expectedJobNames.get(i + 1),
sourceNodes.get(i + 1), destinationNodes.get(i + 1)));
+ join(flowGroup, flowName, expectedJobNames.get(i + 1),
sourceNodes.get(i + 1), destinationNodes.get(i + 1), edgeNames.get(i + 1)));
}
for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) {
@@ -396,7 +398,7 @@ public class MultiHopFlowCompilerTest {
@Test (dependsOnMethods = "testCompileFlowWithRetention")
public void testCompileFlowAfterFirstEdgeDeletion() throws
URISyntaxException, IOException {
//Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+ this.flowGraph.deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1",
false, false);
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -415,7 +417,7 @@ public class MultiHopFlowCompilerTest {
String flowGroup = "testFlowGroup";
String flowName = "testFlowName";
String expectedJobName1 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2");
+ join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2",
"localToHdfs");
String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName1, expectedJobName1);
String from = jobConfig.getString("from");
@@ -445,7 +447,7 @@ public class MultiHopFlowCompilerTest {
jobExecutionPlan = secondHopNode.getValue();
jobConfig = jobExecutionPlan.getJobSpec().getConfig();
String expectedJobName2 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2",
"HDFS-2");
+ join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2",
"HDFS-2", "hdfsConvertToJsonAndEncrypt");
String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName2, expectedJobName2);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName1);
@@ -465,7 +467,7 @@ public class MultiHopFlowCompilerTest {
jobExecutionPlan = thirdHopNode.getValue();
jobConfig = jobExecutionPlan.getJobSpec().getConfig();
String expectedJobName3 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4");
+ join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4", "hdfsToHdfs");
String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName3, expectedJobName3);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName2);
@@ -490,7 +492,7 @@ public class MultiHopFlowCompilerTest {
jobConfig = jobExecutionPlan.getJobSpec().getConfig();
String expectedJobName4 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1");
+ join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1",
"hdfsToAdl");
String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName4, expectedJobName4);
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName3);
@@ -518,7 +520,7 @@ public class MultiHopFlowCompilerTest {
@Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
public void testCompileFlowAfterSecondEdgeDeletion() throws
URISyntaxException, IOException {
//Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+ this.flowGraph.deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1",
false, false);
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -540,7 +542,7 @@ public class MultiHopFlowCompilerTest {
DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String expectedJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3");
+ join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3",
"hdfsToHdfs");
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertEquals(jobName, expectedJobName);
}
@@ -558,9 +560,9 @@ public class MultiHopFlowCompilerTest {
//First hop must be from LocalFS to HDFS-1 and HDFS-2
Set<String> jobNames = new HashSet<>();
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1",
"HDFS-1"));
+ join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-1",
"localToHdfs"));
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1",
"HDFS-2"));
+ join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-2",
"localToHdfs"));
for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
@@ -571,9 +573,9 @@ public class MultiHopFlowCompilerTest {
//Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
jobNames = new HashSet<>();
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3"));
+ join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3",
"hdfsToHdfs"));
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
- join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4"));
+ join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4",
"hdfsToHdfs"));
for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
List<DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode);
Assert.assertEquals(nextNodes.size(), 1);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 4ffdb67..d29ed74 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
@@ -34,8 +35,11 @@ import org.testng.annotations.Test;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -43,6 +47,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.template.FlowTemplate;
import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -113,4 +118,48 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertTrue(nodeSet.contains("job2"));
Assert.assertTrue(nodeSet.contains("job3"));
}
+
+ @Test
+ public void testCreateDagWithDuplicateJobNames() throws Exception {
+ Config flowConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+ Config flowConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+ Config flowConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup").build();
+ List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2,
flowConfig3);
+
+ Config jobConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName1").build();
+ Config jobConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName2").build();
+ Config jobConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName3").build();
+ List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2,
jobConfig3);
+
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) {
+ Config jobConfig = jobConfigs.get(i);
+ if (i > 0) {
+ String previousJobName =
jobExecutionPlans.get(jobExecutionPlans.size() -
1).getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef(previousJobName));
+ }
+
+ FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+ jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), null, 0L,
ConfigFactory.empty()));
+ }
+
+ Dag<JobExecutionPlan> dag = new
JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+ Assert.assertEquals(dag.getStartNodes().size(), 1);
+ Assert.assertEquals(dag.getEndNodes().size(), 1);
+ Assert.assertEquals(dag.getNodes().size(), 3);
+ Assert.assertNull(dag.getNodes().get(0).getParentNodes());
+ Assert.assertEquals(dag.getNodes().get(1).getParentNodes().size(), 1);
+ Assert.assertEquals(dag.getNodes().get(2).getParentNodes().size(), 1);
+ Assert.assertEquals(dag.getNodes().get(1).getParentNodes().get(0),
dag.getNodes().get(0));
+ Assert.assertEquals(dag.getNodes().get(2).getParentNodes().get(0),
dag.getNodes().get(1));
+ }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
index b01722c..116376c 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
@@ -1,5 +1,5 @@
flow.edge.source=ADLS-1
flow.edge.destination=ADLS-1
-flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.id=ADLS-1_ADLS-1_hdfsRemoteRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban03
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
index def8cb9..b5bd122 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
@@ -1,5 +1,5 @@
flow.edge.source=ADLS-1
flow.edge.destination=ADLS-1
-flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.id=ADLS-1_ADLS-1_hdfsRemoteRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban04
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index 110c665..c49899c 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-1
flow.edge.destination=HDFS-1
-flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.id=HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
flow.edge.specExecutors=azkaban01
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
index 8e5b5ef..596e66d 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-1
flow.edge.destination=HDFS-1
-flow.edge.id=HDFS-1:HDFS-1:hdfsRetention
+flow.edge.id=HDFS-1_HDFS-1_hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban01
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 5abd015..7e4fde1 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -1,6 +1,6 @@
flow.edge.source=HDFS-1
flow.edge.destination=HDFS-3
-flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.id=HDFS-1_HDFS-3_hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
flow.edge.specExecutors=azkaban01
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
index 8b54a22..082b639 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-2
flow.edge.destination=HDFS-2
-flow.edge.id=HDFS-2:HDFS-2:hdfsRetention
+flow.edge.id=HDFS-2_HDFS-2_hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban02
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index 24339ec..d104e37 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-2
flow.edge.destination=HDFS-2
-flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.id=HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
flow.edge.specExecutors=azkaban02
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 268a414..6b76a7f 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-2
flow.edge.destination=HDFS-4
-flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.id=HDFS-2_HDFS-4_hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
flow.edge.specExecutors=azkaban02
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
index 3471277..656248a 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -1,6 +1,6 @@
flow.edge.source=HDFS-3
flow.edge.destination=ADLS-1
-flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.id=HDFS-3_ADLS-1_hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
flow.edge.specExecutors=azkaban03
# Proxy config
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
index f1d1adf..ecc6d4e 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-3
flow.edge.destination=HDFS-3
-flow.edge.id=HDFS-3:HDFS-3:hdfsRetention
+flow.edge.id=HDFS-3_HDFS-3_hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban03
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
index 9a08893..08f6209 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -1,6 +1,6 @@
flow.edge.source=HDFS-4
flow.edge.destination=ADLS-1
-flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.id=HDFS-4_ADLS-1_hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
flow.edge.specExecutors=azkaban04
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
index 0a005c8..fbb5a0e 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
@@ -1,5 +1,5 @@
flow.edge.source=HDFS-4
flow.edge.destination=HDFS-4
-flow.edge.id=HDFS-4:HDFS-4:hdfsRetention
+flow.edge.id=HDFS-4_HDFS-4_hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=azkaban04
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
index 8a3809e..5a4d67a 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -1,5 +1,5 @@
flow.edge.source=LocalFS-1
flow.edge.destination=HDFS-1
-flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.id=LocalFS-1_HDFS-1_localToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
flow.edge.specExecutors=local01
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
index 626a12f..f4cd2cf 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -1,5 +1,5 @@
flow.edge.source=LocalFS-1
flow.edge.destination=HDFS-2
-flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.id=LocalFS-1_HDFS-2_localToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
flow.edge.specExecutors=local01
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
index c1f17eb..d842471 100644
---
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
+++
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
@@ -1,5 +1,5 @@
flow.edge.source=LocalFS-1
flow.edge.destination=LocalFS-1
-flow.edge.id=LocalFS-1:LocalFS-1:localRetention
+flow.edge.id=LocalFS-1_LocalFS-1_localRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
flow.edge.specExecutors=local01
\ No newline at end of file