Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ce78e619a -> b9149bfd9


[GOBBLIN-606] Fix duplicate addition of FlowEdge to path for single-hop paths 
in Multi-hop flow compiler.[]

Closes #2472 from sv2000/pathFinderBug


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b9149bfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b9149bfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b9149bfd

Branch: refs/heads/master
Commit: b9149bfd948342163abd15b6b6dc12f9fc83d3f8
Parents: ce78e61
Author: sv2000 <[email protected]>
Authored: Wed Oct 10 11:07:07 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed Oct 10 11:07:07 2018 -0700

----------------------------------------------------------------------
 .../pathfinder/AbstractPathFinder.java          |  15 +--
 .../flowgraph/pathfinder/BFSPathFinder.java     | 104 +++++++++----------
 .../modules/flow/MultiHopFlowCompilerTest.java  |  27 ++++-
 .../src/test/resources/flow/flow.conf           |  24 -----
 .../src/test/resources/flow/flow1.conf          |  24 +++++
 .../src/test/resources/flow/flow2.conf          |  19 ++++
 .../src/test/resources/flow/multicastFlow.conf  |  19 ----
 7 files changed, 117 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 6c1d0b2..eed5603 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.service.modules.flowgraph.pathfinder;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -215,24 +214,16 @@ public abstract class AbstractPathFinder implements 
PathFinder {
    *
    * @param flowEdgeContext of the last {@link FlowEdge} in the path.
    * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link 
FlowSpec}.
-   * @throws IOException
-   * @throws SpecNotFoundException
-   * @throws JobTemplate.TemplateException
-   * @throws URISyntaxException
    */
-  protected List<FlowEdgeContext> constructPath(FlowEdgeContext 
flowEdgeContext)
-      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+  List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) {
     //Backtrace from the last edge using the path map and push each edge into 
a LIFO data structure.
     List<FlowEdgeContext> path = new LinkedList<>();
     path.add(flowEdgeContext);
     FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
-    while (true) {
+    //While we are not at the first edge in the path, add the edge to the path
+    while 
(!this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
       path.add(0, this.pathMap.get(currentFlowEdgeContext));
       currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
-      //Are we at the first edge in the path?
-      if 
(this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
-        break;
-      }
     }
     return path;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 6c95ae5..4f650ad 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph.pathfinder;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -28,8 +26,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
@@ -60,7 +56,8 @@ public class BFSPathFinder extends AbstractPathFinder {
    * Constructor.
    * @param flowGraph
    */
-  public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws 
ReflectiveOperationException {
+  public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+      throws ReflectiveOperationException {
     super(flowGraph, flowSpec);
   }
 
@@ -70,66 +67,61 @@ public class BFSPathFinder extends AbstractPathFinder {
    * added first to the queue. This ensures that dataset transformations are 
always performed closest to the source.
    * @return a path of {@link FlowEdgeContext}s starting at the srcNode and 
ending at the destNode.
    */
-  public List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws 
PathFinderException {
-    try {
-      //Initialization of auxiliary data structures used for path computation
-      this.pathMap = new HashMap<>();
+  public List<FlowEdgeContext> findPathUnicast(DataNode destNode) {
+    //Initialization of auxiliary data structures used for path computation
+    this.pathMap = new HashMap<>();
 
-      //Path computation must be thread-safe to guarantee read consistency. In 
other words, we prevent concurrent read/write access to the
-      // flow graph.
-        //Base condition 1: Source Node or Dest Node is inactive; return null
-        if (!srcNode.isActive() || !destNode.isActive()) {
-          log.warn("Either source node {} or destination node {} is inactive; 
skipping path computation.", this.srcNode.getId(),
-              destNode.getId());
-          return null;
-        }
+    //Path computation must be thread-safe to guarantee read consistency. In 
other words, we prevent concurrent read/write access to the
+    // flow graph.
+    //Base condition 1: Source Node or Dest Node is inactive; return null
+    if (!srcNode.isActive() || !destNode.isActive()) {
+      log.warn("Either source node {} or destination node {} is inactive; 
skipping path computation.",
+          this.srcNode.getId(), destNode.getId());
+      return null;
+    }
 
-        //Base condition 2: Check if we are already at the target. If so, 
return an empty path.
-        if ((srcNode.equals(destNode)) && 
destDatasetDescriptor.contains(srcDatasetDescriptor)) {
-          return new ArrayList<>();
-        }
+    //Base condition 2: Check if we are already at the target. If so, return 
an empty path.
+    if ((srcNode.equals(destNode)) && 
destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+      return new ArrayList<>();
+    }
 
-        LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
-        edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, 
destDatasetDescriptor));
-        for (FlowEdgeContext flowEdgeContext : edgeQueue) {
-          this.pathMap.put(flowEdgeContext, flowEdgeContext);
-        }
+    LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+    edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, 
destDatasetDescriptor));
+    for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+      this.pathMap.put(flowEdgeContext, flowEdgeContext);
+    }
 
-        //At every step, pop an edge E from the edge queue. Mark the edge E as 
visited. Generate the list of adjacent edges
-        // to the edge E. For each adjacent edge E', do the following:
-        //    1. check if the FlowTemplate described by E' is resolvable using 
the flowConfig, and
-        //    2. check if the output dataset descriptor of edge E is 
compatible with the input dataset descriptor of the
-        //       edge E'. If yes, add the edge E' to the edge queue.
-        // If the edge E' satisfies 1 and 2, add it to the edge queue for 
further consideration.
-        while (!edgeQueue.isEmpty()) {
-          FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+    //At every step, pop an edge E from the edge queue. Mark the edge E as 
visited. Generate the list of adjacent edges
+    // to the edge E. For each adjacent edge E', do the following:
+    //    1. check if the FlowTemplate described by E' is resolvable using the 
flowConfig, and
+    //    2. check if the output dataset descriptor of edge E is compatible 
with the input dataset descriptor of the
+    //       edge E'. If yes, add the edge E' to the edge queue.
+    // If the edge E' satisfies 1 and 2, add it to the edge queue for further 
consideration.
+    while (!edgeQueue.isEmpty()) {
+      FlowEdgeContext flowEdgeContext = edgeQueue.pop();
 
-          DataNode currentNode = 
this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
-          DatasetDescriptor currentOutputDatasetDescriptor = 
flowEdgeContext.getOutputDatasetDescriptor();
+      DataNode currentNode = 
this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+      DatasetDescriptor currentOutputDatasetDescriptor = 
flowEdgeContext.getOutputDatasetDescriptor();
 
-          //Are we done?
-          if (isPathFound(currentNode, destNode, 
currentOutputDatasetDescriptor, destDatasetDescriptor)) {
-            return constructPath(flowEdgeContext);
-          }
+      //Are we done?
+      if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor)) {
+        return constructPath(flowEdgeContext);
+      }
 
-          //Expand the currentNode to its adjacent edges and add them to the 
queue.
-          List<FlowEdgeContext> nextEdges =
-              getNextEdges(currentNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor);
-          for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
-            //Add a pointer from the child edge to the parent edge, if the 
child edge is not already in the
-            // queue.
-            if (!this.pathMap.containsKey(childFlowEdgeContext)) {
-              edgeQueue.add(childFlowEdgeContext);
-              this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
-            }
-          }
+      //Expand the currentNode to its adjacent edges and add them to the queue.
+      List<FlowEdgeContext> nextEdges =
+          getNextEdges(currentNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor);
+      for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+        //Add a pointer from the child edge to the parent edge, if the child 
edge is not already in the
+        // queue.
+        if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+          edgeQueue.add(childFlowEdgeContext);
+          this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
         }
-      //No path found. Return null.
-      return null;
-    } catch (SpecNotFoundException | JobTemplate.TemplateException | 
IOException | URISyntaxException e) {
-      throw new PathFinder.PathFinderException(
-          "Exception encountered when computing path from src: " + 
this.srcNode.getId() + " to dest: " + destNode.getId(), e);
+      }
     }
+    //No path found. Return null.
+    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 955205b..8574dac 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -168,9 +168,10 @@ public class MultiHopFlowCompilerTest {
     FlowSpec spec = flowSpecBuilder.build();
     return spec;
   }
+
   @Test
   public void testCompileFlow() throws URISyntaxException, IOException {
-    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
     Assert.assertEquals(jobDag.getNodes().size(), 4);
     Assert.assertEquals(jobDag.getStartNodes().size(), 1);
@@ -273,7 +274,7 @@ public class MultiHopFlowCompilerTest {
     //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
     this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
 
-    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     Assert.assertEquals(jobDag.getNodes().size(), 4);
@@ -377,7 +378,7 @@ public class MultiHopFlowCompilerTest {
     //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
     this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
 
-    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     //Ensure no path to destination.
@@ -385,8 +386,26 @@ public class MultiHopFlowCompilerTest {
   }
 
   @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
+  public void testCompileFlowSingleHop() throws IOException, 
URISyntaxException {
+    FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3");
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+    Assert.assertEquals(jobDag.getNodes().size(), 1);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+    Assert.assertEquals(jobDag.getStartNodes().get(0), 
jobDag.getEndNodes().get(0));
+
+    //Ensure hop is from HDFS-1 to HDFS-3.
+    Dag.DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+    Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn03.grid.linkedin.com:8888/");
+  }
+
+
+  @Test (dependsOnMethods = "testCompileFlowSingleHop")
   public void testMulticastPath() throws IOException, URISyntaxException {
-    FlowSpec spec = createFlowSpec("flow/multicastFlow.conf", "LocalFS-1", 
"HDFS-3,HDFS-4");
+    FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", 
"HDFS-3,HDFS-4");
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     Assert.assertEquals(jobDag.getNodes().size(), 4);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow.conf 
b/gobblin-service/src/test/resources/flow/flow.conf
deleted file mode 100644
index f818df6..0000000
--- a/gobblin-service/src/test/resources/flow/flow.conf
+++ /dev/null
@@ -1,24 +0,0 @@
-team.name=testTeam
-dataset.name=testDataset
-user.to.proxy=testUser
-adls.user.to.proxy=adlsTestUser
-adls.oauth2.client.id=1234
-adls.ouath2.credential=credential
-
-#Input dataset - uncompressed and unencrypted
-gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
-gobblin.flow.input.dataset.descriptor.platform=hdfs
-gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
-gobblin.flow.input.dataset.descriptor.format=avro
-gobblin.flow.input.dataset.descriptor.codec=NONE
-gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
-
-#Output dataset - compressed and encrypted
-gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
-gobblin.flow.output.dataset.descriptor.platform=adls
-gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
-gobblin.flow.output.dataset.descriptor.format=json
-gobblin.flow.output.dataset.descriptor.codec=gzip
-gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
-gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
-gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow1.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow1.conf 
b/gobblin-service/src/test/resources/flow/flow1.conf
new file mode 100644
index 0000000..f818df6
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow1.conf
@@ -0,0 +1,24 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+adls.user.to.proxy=adlsTestUser
+adls.oauth2.client.id=1234
+adls.ouath2.credential=credential
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - compressed and encrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=adls
+gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.output.dataset.descriptor.format=json
+gobblin.flow.output.dataset.descriptor.codec=gzip
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow2.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow2.conf 
b/gobblin-service/src/test/resources/flow/flow2.conf
new file mode 100644
index 0000000..1b92343
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow2.conf
@@ -0,0 +1,19 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format}
+gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec}
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/multicastFlow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/multicastFlow.conf 
b/gobblin-service/src/test/resources/flow/multicastFlow.conf
deleted file mode 100644
index 1b92343..0000000
--- a/gobblin-service/src/test/resources/flow/multicastFlow.conf
+++ /dev/null
@@ -1,19 +0,0 @@
-team.name=testTeam
-dataset.name=testDataset
-user.to.proxy=testUser
-
-#Input dataset - uncompressed and unencrypted
-gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
-gobblin.flow.input.dataset.descriptor.platform=hdfs
-gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
-gobblin.flow.input.dataset.descriptor.format=avro
-gobblin.flow.input.dataset.descriptor.codec=NONE
-gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
-
-#Output dataset - same as input dataset
-gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
-gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
-gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path}
-gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format}
-gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec}
-gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm}
\ No newline at end of file

Reply via email to