This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 37078ed  [GOBBLIN-853] Support multiple paths specified in flow config
37078ed is described below

commit 37078ed6a40f73b2560863e3f73e0e1d67c05c87
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Oct 2 10:49:10 2019 -0700

    [GOBBLIN-853] Support multiple paths specified in flow config
    
    Closes #2709 from jack-moseley/split-flowspec
---
 .../gobblin/configuration/ConfigurationKeys.java   |  3 +
 .../service/modules/flow/MultiHopFlowCompiler.java | 70 +++++++++++++++++++---
 .../service/modules/spec/JobExecutionPlan.java     |  8 ++-
 .../modules/flow/MultiHopFlowCompilerTest.java     | 52 ++++++++++++----
 gobblin-service/src/test/resources/flow/flow3.conf | 13 ++++
 .../hdfsSnapshotRetention/flow.conf                | 18 ++++++
 .../flowEdgeTemplates/hdfsToHdfs/flow.conf         | 11 +++-
 7 files changed, 151 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a485427..dc8b8cb 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -895,6 +895,9 @@ public class ConfigurationKeys {
   public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
   public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
   public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+  public static final String DATASET_SUBPATHS_KEY = 
"gobblin.flow.dataset.subPaths";
+  public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
+  public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
 
   /***
    * Configuration properties related to TopologySpec Store
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d9a3812..67d1f19 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -27,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +37,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -51,10 +54,10 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -169,18 +172,23 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
         ConfigUtils.getString(flowSpec.getConfig(), 
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
     log.info(String.format("Compiling flow for source: %s and destination: 
%s", source, destination));
 
-    Dag<JobExecutionPlan> jobExecutionPlanDag;
+    List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
+    Dag<JobExecutionPlan> jobExecutionPlanDag = new Dag<>(new ArrayList<>());
     try {
       this.rwLock.readLock().lock();
-      //Compute the path from source to destination.
-      FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
-      //Convert the path into a Dag of JobExecutionPlans.
-      if (flowGraphPath != null) {
-        jobExecutionPlanDag = flowGraphPath.asDag(this.config);
-      } else {
+      for (FlowSpec datasetFlowSpec : flowSpecs) {
+        //Compute the path from source to destination.
+        FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
+        if (flowGraphPath != null) {
+          //Convert the path into a Dag of JobExecutionPlans.
+          jobExecutionPlanDag = 
jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
+        }
+      }
+
+      if (jobExecutionPlanDag.isEmpty()) {
         Instrumented.markMeter(flowCompilationFailedMeter);
         log.info(String.format("No path found from source: %s and destination: 
%s", source, destination));
-        return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
+        return jobExecutionPlanDag;
       }
     } catch (PathFinder.PathFinderException | SpecNotFoundException | 
JobTemplate.TemplateException | URISyntaxException | 
ReflectiveOperationException e) {
       Instrumented.markMeter(flowCompilationFailedMeter);
@@ -198,6 +206,50 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   }
 
   /**
+   * If {@link FlowSpec} has {@link ConfigurationKeys#DATASET_SUBPATHS_KEY}, 
split it into multiple flowSpecs using a
+   * provided base input and base output path to generate multiple 
source/destination paths.
+   */
+  private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
+    long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
+    List<FlowSpec> flowSpecs = new ArrayList<>();
+
+    if (flowSpec.getConfig().hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
+      List<String> datasetSubpaths = 
ConfigUtils.getStringList(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_SUBPATHS_KEY);
+      String baseInputPath = ConfigUtils.getString(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
+      String baseOutputPath = ConfigUtils.getString(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
+
+      for (String subPath : datasetSubpaths) {
+        Config newConfig = 
flowSpec.getConfig().withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
+            .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId))
+            
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+                ConfigValueFactory.fromAnyRef(new Path(baseInputPath, 
subPath).toString()))
+            
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+                ConfigValueFactory.fromAnyRef(new Path(baseOutputPath, 
subPath).toString()));
+        flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+      }
+    } else {
+      flowSpecs.add(flowSpec);
+    }
+
+    return flowSpecs;
+  }
+
+  private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config 
newConfig) {
+    FlowSpec.Builder builder = 
FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion())
+        .withDescription(flowSpec.getDescription()).withConfig(newConfig);
+
+    if (flowSpec.getTemplateURIs().isPresent()) {
+      builder = builder.withTemplates(flowSpec.getTemplateURIs().get());
+    }
+
+    if (flowSpec.getChildSpecs().isPresent()) {
+      builder = builder.withTemplates(flowSpec.getChildSpecs().get());
+    }
+
+    return builder.build();
+  }
+
+  /**
    * Register a shutdown hook for this thread.
    */
   private void addShutdownHook() {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 27d76cb..82d2bae 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
@@ -86,12 +87,15 @@ public class JobExecutionPlan {
       String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
       String flowGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
       String flowFailureOption = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
+      String flowInputPath = ConfigUtils.getString(flowConfig, 
DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX
+          + "." + DatasetDescriptorConfigKeys.PATH_KEY, "");
 
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
       String edgeId = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
 
-      //Modify the job name to include the flow group, flow name and edge id.
-      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, edgeId);
+      // Modify the job name to include the flow group, flow name, edge id, 
and a random string to avoid collisions since
+      // job names are assumed to be unique within a dag.
+      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, edgeId, flowInputPath.hashCode());
 
       JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
           
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c731714..b0faa6d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -246,7 +246,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1", 
"localToHdfs");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName1, expectedJobName1);
+    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -276,7 +276,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", 
"HDFS-1", "hdfsConvertToJsonAndEncrypt");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName2, expectedJobName2);
+    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -296,7 +296,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3", "hdfsToHdfs");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName3, expectedJobName3);
+    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -320,7 +320,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1", 
"hdfsToAdl");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName4, expectedJobName4);
+    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -383,7 +383,7 @@ public class MultiHopFlowCompilerTest {
       for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) {
         Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
         String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-        Assert.assertTrue(jobNames.contains(jobName));
+        Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
         log.warn(jobName);
         nextHopNodes.addAll(jobDag.getChildren(dagNode));
       }
@@ -419,7 +419,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2", 
"localToHdfs");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName1, expectedJobName1);
+    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -449,7 +449,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2", 
"HDFS-2", "hdfsConvertToJsonAndEncrypt");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName2, expectedJobName2);
+    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -469,7 +469,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4", "hdfsToHdfs");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName3, expectedJobName3);
+    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -494,7 +494,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1", 
"hdfsToAdl");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName4, expectedJobName4);
+    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -544,7 +544,7 @@ public class MultiHopFlowCompilerTest {
     String expectedJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
         join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3", 
"hdfsToHdfs");
     String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertEquals(jobName, expectedJobName);
+    Assert.assertTrue(jobName.startsWith(expectedJobName));
   }
 
 
@@ -567,7 +567,7 @@ public class MultiHopFlowCompilerTest {
     for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
       Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
       String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-      Assert.assertTrue(jobNames.contains(jobName));
+      Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
     }
 
     //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
@@ -581,11 +581,39 @@ public class MultiHopFlowCompilerTest {
       Assert.assertEquals(nextNodes.size(), 1);
       Config jobConfig = nextNodes.get(0).getValue().getJobSpec().getConfig();
       String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-      Assert.assertTrue(jobNames.contains(jobName));
+      Assert.assertTrue(jobNames.stream().anyMatch(jobName::startsWith));
     }
   }
 
   @Test (dependsOnMethods = "testMulticastPath")
+  public void testCompileMultiDatasetFlow() throws Exception {
+    FlowSpec spec = createFlowSpec("flow/flow3.conf", "HDFS-1", "HDFS-3", 
true, false);
+
+    Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+    // Should be 3 parallel jobs, one for each dataset, with copy -> retention
+    Assert.assertEquals(dag.getNodes().size(), 6);
+    Assert.assertEquals(dag.getEndNodes().size(), 3);
+    Assert.assertEquals(dag.getStartNodes().size(), 3);
+
+    String copyJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3", 
"hdfsToHdfs");
+    for (DagNode<JobExecutionPlan> dagNode : dag.getStartNodes()) {
+      Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+      Assert.assertTrue(jobName.startsWith(copyJobName));
+    }
+
+    String retentionJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "SnapshotRetention", "HDFS-3", 
"HDFS-3", "hdfsRetention");
+    for (DagNode<JobExecutionPlan> dagNode : dag.getEndNodes()) {
+      Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+      Assert.assertTrue(jobName.startsWith(retentionJobName));
+    }
+  }
+
+  @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
   public void testGitFlowGraphMonitorService()
       throws IOException, GitAPIException, URISyntaxException, 
InterruptedException {
     File remoteDir = new File(TESTDIR + "/remote");
diff --git a/gobblin-service/src/test/resources/flow/flow3.conf 
b/gobblin-service/src/test/resources/flow/flow3.conf
new file mode 100644
index 0000000..574bd32
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow3.conf
@@ -0,0 +1,13 @@
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.dataset.baseInputPath=/data/input
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.dataset.baseOutputPath=/data/output
+
+gobblin.flow.dataset.subPaths="dataset0,dataset1,dataset2"
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
index 8560ebf..09a16f0 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
@@ -33,3 +33,21 @@ 
gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.da
 
gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform}
 
gobblin.flow.edge.output.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name}
 gobblin.flow.edge.output.dataset.descriptor.3.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.4.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.4.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.4.path=${gobblin.flow.input.dataset.descriptor.path}
+
+gobblin.flow.edge.output.dataset.descriptor.4.class=${gobblin.flow.edge.input.dataset.descriptor.4.class}
+gobblin.flow.edge.output.dataset.descriptor.4.platform=${gobblin.flow.edge.input.dataset.descriptor.4.platform}
+gobblin.flow.edge.output.dataset.descriptor.4.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.4.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.5.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.5.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
+
+gobblin.flow.edge.output.dataset.descriptor.5.class=${gobblin.flow.edge.input.dataset.descriptor.5.class}
+gobblin.flow.edge.output.dataset.descriptor.5.platform=${gobblin.flow.edge.input.dataset.descriptor.5.platform}
+gobblin.flow.edge.output.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
index 4563780..d029a31 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -14,4 +14,13 @@ 
gobblin.flow.edge.input.dataset.descriptor.1.isRetentionApplied=${flow.applyRete
 
 
gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
 
gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
-gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+
+gobblin.flow.edge.input.dataset.descriptor.2.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.2.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.2.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.edge.input.dataset.descriptor.2.isRetentionApplied=${flow.applyRetention}
+
+gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class}
+gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform}
+gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file

Reply via email to