This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 37078ed [GOBBLIN-853] Support multiple paths specified in flow config
37078ed is described below
commit 37078ed6a40f73b2560863e3f73e0e1d67c05c87
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Oct 2 10:49:10 2019 -0700
[GOBBLIN-853] Support multiple paths specified in flow config
Closes #2709 from jack-moseley/split-flowspec
---
.../gobblin/configuration/ConfigurationKeys.java | 3 +
.../service/modules/flow/MultiHopFlowCompiler.java | 70 +++++++++++++++++++---
.../service/modules/spec/JobExecutionPlan.java | 8 ++-
.../modules/flow/MultiHopFlowCompilerTest.java | 52 ++++++++++++----
gobblin-service/src/test/resources/flow/flow3.conf | 13 ++++
.../hdfsSnapshotRetention/flow.conf | 18 ++++++
.../flowEdgeTemplates/hdfsToHdfs/flow.conf | 11 +++-
7 files changed, 151 insertions(+), 24 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a485427..dc8b8cb 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -895,6 +895,9 @@ public class ConfigurationKeys {
public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+ public static final String DATASET_SUBPATHS_KEY =
"gobblin.flow.dataset.subPaths";
+ public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
+ public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
/***
* Configuration properties related to TopologySpec Store
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d9a3812..67d1f19 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -27,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +37,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -51,10 +54,10 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -169,18 +172,23 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
ConfigUtils.getString(flowSpec.getConfig(),
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
log.info(String.format("Compiling flow for source: %s and destination:
%s", source, destination));
- Dag<JobExecutionPlan> jobExecutionPlanDag;
+ List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = new Dag<>(new ArrayList<>());
try {
this.rwLock.readLock().lock();
- //Compute the path from source to destination.
- FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
- //Convert the path into a Dag of JobExecutionPlans.
- if (flowGraphPath != null) {
- jobExecutionPlanDag = flowGraphPath.asDag(this.config);
- } else {
+ for (FlowSpec datasetFlowSpec : flowSpecs) {
+ //Compute the path from source to destination.
+ FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
+ if (flowGraphPath != null) {
+ //Convert the path into a Dag of JobExecutionPlans.
+ jobExecutionPlanDag =
jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
+ }
+ }
+
+ if (jobExecutionPlanDag.isEmpty()) {
Instrumented.markMeter(flowCompilationFailedMeter);
log.info(String.format("No path found from source: %s and destination:
%s", source, destination));
- return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
+ return jobExecutionPlanDag;
}
} catch (PathFinder.PathFinderException | SpecNotFoundException |
JobTemplate.TemplateException | URISyntaxException |
ReflectiveOperationException e) {
Instrumented.markMeter(flowCompilationFailedMeter);
@@ -198,6 +206,50 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
}
/**
+ * If {@link FlowSpec} has {@link ConfigurationKeys#DATASET_SUBPATHS_KEY},
split it into multiple flowSpecs using a
+ * provided base input and base output path to generate multiple
source/destination paths.
+ */
+ private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
+ long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
+ List<FlowSpec> flowSpecs = new ArrayList<>();
+
+ if (flowSpec.getConfig().hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
+ List<String> datasetSubpaths =
ConfigUtils.getStringList(flowSpec.getConfig(),
ConfigurationKeys.DATASET_SUBPATHS_KEY);
+ String baseInputPath = ConfigUtils.getString(flowSpec.getConfig(),
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
+ String baseOutputPath = ConfigUtils.getString(flowSpec.getConfig(),
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
+
+ for (String subPath : datasetSubpaths) {
+ Config newConfig =
flowSpec.getConfig().withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+ ConfigValueFactory.fromAnyRef(new Path(baseInputPath,
subPath).toString()))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+ ConfigValueFactory.fromAnyRef(new Path(baseOutputPath,
subPath).toString()));
+ flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+ }
+ } else {
+ flowSpecs.add(flowSpec);
+ }
+
+ return flowSpecs;
+ }
+
+ private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config
newConfig) {
+ FlowSpec.Builder builder =
FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion())
+ .withDescription(flowSpec.getDescription()).withConfig(newConfig);
+
+ if (flowSpec.getTemplateURIs().isPresent()) {
+ builder = builder.withTemplates(flowSpec.getTemplateURIs().get());
+ }
+
+ if (flowSpec.getChildSpecs().isPresent()) {
+ builder = builder.withTemplates(flowSpec.getChildSpecs().get());
+ }
+
+ return builder.build();
+ }
+
+ /**
* Register a shutdown hook for this thread.
*/
private void addShutdownHook() {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 27d76cb..82d2bae 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
@@ -86,12 +87,15 @@ public class JobExecutionPlan {
String flowName = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_NAME_KEY, "");
String flowGroup = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_GROUP_KEY, "");
String flowFailureOption = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
+ String flowInputPath = ConfigUtils.getString(flowConfig,
DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX
+ + "." + DatasetDescriptorConfigKeys.PATH_KEY, "");
String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
String edgeId = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
- //Modify the job name to include the flow group, flow name and edge id.
- jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId);
+ // Modify the job name to include the flow group, flow name, edge id,
and a random string to avoid collisions since
+ // job names are assumed to be unique within a dag.
+ jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId, flowInputPath.hashCode());
JobSpec.Builder jobSpecBuilder =
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName,
flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c731714..b0faa6d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -246,7 +246,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName1 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1",
"localToHdfs");
String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName1, expectedJobName1);
+ Assert.assertTrue(jobName1.startsWith(expectedJobName1));
String from = jobConfig.getString("from");
String to = jobConfig.getString("to");
Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -276,7 +276,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName2 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1",
"HDFS-1", "hdfsConvertToJsonAndEncrypt");
String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName2, expectedJobName2);
+ Assert.assertTrue(jobName2.startsWith(expectedJobName2));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName1);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -296,7 +296,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName3 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs");
String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName3, expectedJobName3);
+ Assert.assertTrue(jobName3.startsWith(expectedJobName3));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName2);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -320,7 +320,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName4 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1",
"hdfsToAdl");
String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName4, expectedJobName4);
+ Assert.assertTrue(jobName4.startsWith(expectedJobName4));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName3);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -383,7 +383,7 @@ public class MultiHopFlowCompilerTest {
for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertTrue(jobNames.contains(jobName));
+ Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
log.warn(jobName);
nextHopNodes.addAll(jobDag.getChildren(dagNode));
}
@@ -419,7 +419,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName1 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2",
"localToHdfs");
String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName1, expectedJobName1);
+ Assert.assertTrue(jobName1.startsWith(expectedJobName1));
String from = jobConfig.getString("from");
String to = jobConfig.getString("to");
Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -449,7 +449,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName2 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2",
"HDFS-2", "hdfsConvertToJsonAndEncrypt");
String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName2, expectedJobName2);
+ Assert.assertTrue(jobName2.startsWith(expectedJobName2));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName1);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -469,7 +469,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName3 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4", "hdfsToHdfs");
String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName3, expectedJobName3);
+ Assert.assertTrue(jobName3.startsWith(expectedJobName3));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName2);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -494,7 +494,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName4 =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1",
"hdfsToAdl");
String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName4, expectedJobName4);
+ Assert.assertTrue(jobName4.startsWith(expectedJobName4));
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES),
jobName3);
from = jobConfig.getString("from");
to = jobConfig.getString("to");
@@ -544,7 +544,7 @@ public class MultiHopFlowCompilerTest {
String expectedJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3",
"hdfsToHdfs");
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertEquals(jobName, expectedJobName);
+ Assert.assertTrue(jobName.startsWith(expectedJobName));
}
@@ -567,7 +567,7 @@ public class MultiHopFlowCompilerTest {
for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertTrue(jobNames.contains(jobName));
+ Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
}
//Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
@@ -581,11 +581,39 @@ public class MultiHopFlowCompilerTest {
Assert.assertEquals(nextNodes.size(), 1);
Config jobConfig = nextNodes.get(0).getValue().getJobSpec().getConfig();
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- Assert.assertTrue(jobNames.contains(jobName));
+ Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
}
}
@Test (dependsOnMethods = "testMulticastPath")
+ public void testCompileMultiDatasetFlow() throws Exception {
+ FlowSpec spec = createFlowSpec("flow/flow3.conf", "HDFS-1", "HDFS-3",
true, false);
+
+ Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+ // Should be 3 parallel jobs, one for each dataset, with copy -> retention
+ Assert.assertEquals(dag.getNodes().size(), 6);
+ Assert.assertEquals(dag.getEndNodes().size(), 3);
+ Assert.assertEquals(dag.getStartNodes().size(), 3);
+
+ String copyJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+ join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3",
"hdfsToHdfs");
+ for (DagNode<JobExecutionPlan> dagNode : dag.getStartNodes()) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+ Assert.assertTrue(jobName.startsWith(copyJobName));
+ }
+
+ String retentionJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+ join("testFlowGroup", "testFlowName", "SnapshotRetention", "HDFS-3",
"HDFS-3", "hdfsRetention");
+ for (DagNode<JobExecutionPlan> dagNode : dag.getEndNodes()) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+ Assert.assertTrue(jobName.startsWith(retentionJobName));
+ }
+ }
+
+ @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
public void testGitFlowGraphMonitorService()
throws IOException, GitAPIException, URISyntaxException,
InterruptedException {
File remoteDir = new File(TESTDIR + "/remote");
diff --git a/gobblin-service/src/test/resources/flow/flow3.conf
b/gobblin-service/src/test/resources/flow/flow3.conf
new file mode 100644
index 0000000..574bd32
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow3.conf
@@ -0,0 +1,13 @@
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.dataset.baseInputPath=/data/input
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.dataset.baseOutputPath=/data/output
+
+gobblin.flow.dataset.subPaths="dataset0,dataset1,dataset2"
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
index 8560ebf..09a16f0 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
@@ -33,3 +33,21 @@
gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.da
gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform}
gobblin.flow.edge.output.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name}
gobblin.flow.edge.output.dataset.descriptor.3.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.4.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.4.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.4.path=${gobblin.flow.input.dataset.descriptor.path}
+
+gobblin.flow.edge.output.dataset.descriptor.4.class=${gobblin.flow.edge.input.dataset.descriptor.4.class}
+gobblin.flow.edge.output.dataset.descriptor.4.platform=${gobblin.flow.edge.input.dataset.descriptor.4.platform}
+gobblin.flow.edge.output.dataset.descriptor.4.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.4.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.5.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.5.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
+
+gobblin.flow.edge.output.dataset.descriptor.5.class=${gobblin.flow.edge.input.dataset.descriptor.5.class}
+gobblin.flow.edge.output.dataset.descriptor.5.platform=${gobblin.flow.edge.input.dataset.descriptor.5.platform}
+gobblin.flow.edge.output.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
index 4563780..d029a31 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -14,4 +14,13 @@
gobblin.flow.edge.input.dataset.descriptor.1.isRetentionApplied=${flow.applyRete
gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
-gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+
+gobblin.flow.edge.input.dataset.descriptor.2.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.2.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.2.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.edge.input.dataset.descriptor.2.isRetentionApplied=${flow.applyRetention}
+
+gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class}
+gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform}
+gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file