Repository: incubator-gobblin Updated Branches: refs/heads/master ce78e619a -> b9149bfd9
[GOBBLIN-606] Fix duplicate addition of FlowEdge to path for single-hop paths in Multi-hop flow compiler.[] Closes #2472 from sv2000/pathFinderBug Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b9149bfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b9149bfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b9149bfd Branch: refs/heads/master Commit: b9149bfd948342163abd15b6b6dc12f9fc83d3f8 Parents: ce78e61 Author: sv2000 <[email protected]> Authored: Wed Oct 10 11:07:07 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 10 11:07:07 2018 -0700 ---------------------------------------------------------------------- .../pathfinder/AbstractPathFinder.java | 15 +-- .../flowgraph/pathfinder/BFSPathFinder.java | 104 +++++++++---------- .../modules/flow/MultiHopFlowCompilerTest.java | 27 ++++- .../src/test/resources/flow/flow.conf | 24 ----- .../src/test/resources/flow/flow1.conf | 24 +++++ .../src/test/resources/flow/flow2.conf | 19 ++++ .../src/test/resources/flow/multicastFlow.conf | 19 ---- 7 files changed, 117 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java index 6c1d0b2..eed5603 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java @@ -18,7 +18,6 @@ package org.apache.gobblin.service.modules.flowgraph.pathfinder; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -215,24 +214,16 @@ public abstract class AbstractPathFinder implements PathFinder { * * @param flowEdgeContext of the last {@link FlowEdge} in the path. * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}. - * @throws IOException - * @throws SpecNotFoundException - * @throws JobTemplate.TemplateException - * @throws URISyntaxException */ - protected List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) - throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) { //Backtrace from the last edge using the path map and push each edge into a LIFO data structure. List<FlowEdgeContext> path = new LinkedList<>(); path.add(flowEdgeContext); FlowEdgeContext currentFlowEdgeContext = flowEdgeContext; - while (true) { + //While we are not at the first edge in the path, add the edge to the path + while (!this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) { path.add(0, this.pathMap.get(currentFlowEdgeContext)); currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext); - //Are we at the first edge in the path? - if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) { - break; - } } return path; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java index 6c95ae5..4f650ad 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java @@ -17,8 +17,6 @@ package org.apache.gobblin.service.modules.flowgraph.pathfinder; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -28,8 +26,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; import org.apache.gobblin.service.modules.flow.FlowEdgeContext; import org.apache.gobblin.service.modules.flowgraph.DataNode; @@ -60,7 +56,8 @@ public class BFSPathFinder extends AbstractPathFinder { * Constructor. * @param flowGraph */ - public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException { + public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) + throws ReflectiveOperationException { super(flowGraph, flowSpec); } @@ -70,66 +67,61 @@ public class BFSPathFinder extends AbstractPathFinder { * added first to the queue. This ensures that dataset transformations are always performed closest to the source. * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode. */ - public List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException { - try { - //Initialization of auxiliary data structures used for path computation - this.pathMap = new HashMap<>(); + public List<FlowEdgeContext> findPathUnicast(DataNode destNode) { + //Initialization of auxiliary data structures used for path computation + this.pathMap = new HashMap<>(); - //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the - // flow graph. - //Base condition 1: Source Node or Dest Node is inactive; return null - if (!srcNode.isActive() || !destNode.isActive()) { - log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(), - destNode.getId()); - return null; - } + //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the + // flow graph. + //Base condition 1: Source Node or Dest Node is inactive; return null + if (!srcNode.isActive() || !destNode.isActive()) { + log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", + this.srcNode.getId(), destNode.getId()); + return null; + } - //Base condition 2: Check if we are already at the target. If so, return an empty path. - if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { - return new ArrayList<>(); - } + //Base condition 2: Check if we are already at the target. If so, return an empty path. + if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { + return new ArrayList<>(); + } - LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); - edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); - for (FlowEdgeContext flowEdgeContext : edgeQueue) { - this.pathMap.put(flowEdgeContext, flowEdgeContext); - } + LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); + edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); + for (FlowEdgeContext flowEdgeContext : edgeQueue) { + this.pathMap.put(flowEdgeContext, flowEdgeContext); + } - //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges - // to the edge E. For each adjacent edge E', do the following: - // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and - // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the - // edge E'. If yes, add the edge E' to the edge queue. - // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. - while (!edgeQueue.isEmpty()) { - FlowEdgeContext flowEdgeContext = edgeQueue.pop(); + //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges + // to the edge E. For each adjacent edge E', do the following: + // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and + // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the + // edge E'. If yes, add the edge E' to the edge queue. + // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. + while (!edgeQueue.isEmpty()) { + FlowEdgeContext flowEdgeContext = edgeQueue.pop(); - DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); - DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); + DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); + DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); - //Are we done? - if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { - return constructPath(flowEdgeContext); - } + //Are we done? + if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { + return constructPath(flowEdgeContext); + } - //Expand the currentNode to its adjacent edges and add them to the queue. - List<FlowEdgeContext> nextEdges = - getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); - for (FlowEdgeContext childFlowEdgeContext : nextEdges) { - //Add a pointer from the child edge to the parent edge, if the child edge is not already in the - // queue. - if (!this.pathMap.containsKey(childFlowEdgeContext)) { - edgeQueue.add(childFlowEdgeContext); - this.pathMap.put(childFlowEdgeContext, flowEdgeContext); - } - } + //Expand the currentNode to its adjacent edges and add them to the queue. + List<FlowEdgeContext> nextEdges = + getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); + for (FlowEdgeContext childFlowEdgeContext : nextEdges) { + //Add a pointer from the child edge to the parent edge, if the child edge is not already in the + // queue. + if (!this.pathMap.containsKey(childFlowEdgeContext)) { + edgeQueue.add(childFlowEdgeContext); + this.pathMap.put(childFlowEdgeContext, flowEdgeContext); } - //No path found. Return null. - return null; - } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) { - throw new PathFinder.PathFinderException( - "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode.getId(), e); + } } + //No path found. Return null. + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index 955205b..8574dac 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -168,9 +168,10 @@ public class MultiHopFlowCompilerTest { FlowSpec spec = flowSpecBuilder.build(); return spec; } + @Test public void testCompileFlow() throws URISyntaxException, IOException { - FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); Assert.assertEquals(jobDag.getStartNodes().size(), 1); @@ -273,7 +274,7 @@ public class MultiHopFlowCompilerTest { //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt. this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt"); - FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); @@ -377,7 +378,7 @@ public class MultiHopFlowCompilerTest { //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt. this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt"); - FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); //Ensure no path to destination. @@ -385,8 +386,26 @@ public class MultiHopFlowCompilerTest { } @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion") + public void testCompileFlowSingleHop() throws IOException, URISyntaxException { + FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3"); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + Assert.assertEquals(jobDag.getNodes().size(), 1); + Assert.assertEquals(jobDag.getStartNodes().size(), 1); + Assert.assertEquals(jobDag.getEndNodes().size(), 1); + Assert.assertEquals(jobDag.getStartNodes().get(0), jobDag.getEndNodes().get(0)); + + //Ensure hop is from HDFS-1 to HDFS-3. + Dag.DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0); + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); + Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + } + + + @Test (dependsOnMethods = "testCompileFlowSingleHop") public void testMulticastPath() throws IOException, URISyntaxException { - FlowSpec spec = createFlowSpec("flow/multicastFlow.conf", "LocalFS-1", "HDFS-3,HDFS-4"); + FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", "HDFS-3,HDFS-4"); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf deleted file mode 100644 index f818df6..0000000 --- a/gobblin-service/src/test/resources/flow/flow.conf +++ /dev/null @@ -1,24 +0,0 @@ -team.name=testTeam -dataset.name=testDataset -user.to.proxy=testUser -adls.user.to.proxy=adlsTestUser -adls.oauth2.client.id=1234 -adls.ouath2.credential=credential - -#Input dataset - uncompressed and unencrypted -gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor -gobblin.flow.input.dataset.descriptor.platform=hdfs -gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} -gobblin.flow.input.dataset.descriptor.format=avro -gobblin.flow.input.dataset.descriptor.codec=NONE -gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE - -#Output dataset - compressed and encrypted -gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor -gobblin.flow.output.dataset.descriptor.platform=adls -gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name} -gobblin.flow.output.dataset.descriptor.format=json -gobblin.flow.output.dataset.descriptor.codec=gzip -gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating -gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json -gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow1.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/flow1.conf b/gobblin-service/src/test/resources/flow/flow1.conf new file mode 100644 index 0000000..f818df6 --- /dev/null +++ b/gobblin-service/src/test/resources/flow/flow1.conf @@ -0,0 +1,24 @@ +team.name=testTeam +dataset.name=testDataset +user.to.proxy=testUser +adls.user.to.proxy=adlsTestUser +adls.oauth2.client.id=1234 +adls.ouath2.credential=credential + +#Input dataset - uncompressed and unencrypted +gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.input.dataset.descriptor.platform=hdfs +gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.input.dataset.descriptor.format=avro +gobblin.flow.input.dataset.descriptor.codec=NONE +gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE + +#Output dataset - compressed and encrypted +gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.output.dataset.descriptor.platform=adls +gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name} +gobblin.flow.output.dataset.descriptor.format=json +gobblin.flow.output.dataset.descriptor.codec=gzip +gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating +gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json +gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/flow2.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/flow2.conf b/gobblin-service/src/test/resources/flow/flow2.conf new file mode 100644 index 0000000..1b92343 --- /dev/null +++ b/gobblin-service/src/test/resources/flow/flow2.conf @@ -0,0 +1,19 @@ +team.name=testTeam +dataset.name=testDataset +user.to.proxy=testUser + +#Input dataset - uncompressed and unencrypted +gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.input.dataset.descriptor.platform=hdfs +gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.input.dataset.descriptor.format=avro +gobblin.flow.input.dataset.descriptor.codec=NONE +gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE + +#Output dataset - same as input dataset +gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class} +gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform} +gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path} +gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format} +gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec} +gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b9149bfd/gobblin-service/src/test/resources/flow/multicastFlow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/multicastFlow.conf b/gobblin-service/src/test/resources/flow/multicastFlow.conf deleted file mode 100644 index 1b92343..0000000 --- a/gobblin-service/src/test/resources/flow/multicastFlow.conf +++ /dev/null @@ -1,19 +0,0 @@ -team.name=testTeam -dataset.name=testDataset -user.to.proxy=testUser - -#Input dataset - uncompressed and unencrypted -gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor -gobblin.flow.input.dataset.descriptor.platform=hdfs -gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} -gobblin.flow.input.dataset.descriptor.format=avro -gobblin.flow.input.dataset.descriptor.codec=NONE -gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE - -#Output dataset - same as input dataset -gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class} -gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform} -gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path} -gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format} -gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec} -gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm} \ No newline at end of file
