Repository: incubator-gobblin Updated Branches: refs/heads/master ef26d287d -> cd5222775
[GOBBLIN-552] Add multicast option to the MultiHopFlowCompiler.[] Closes #2413 from sv2000/multicast Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cd522277 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cd522277 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cd522277 Branch: refs/heads/master Commit: cd5222775cb8dcb3022b65a77cae1d3225a0070d Parents: ef26d28 Author: suvasude <[email protected]> Authored: Sun Aug 12 22:01:59 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Sun Aug 12 22:01:59 2018 -0700 ---------------------------------------------------------------------- .../service/modules/flow/FlowGraphPath.java | 35 +++-- .../modules/flow/FlowGraphPathFinder.java | 149 +++++++++++-------- .../modules/flow/MultiHopFlowCompiler.java | 2 +- .../gobblin/service/modules/flowgraph/Dag.java | 37 ++++- .../modules/flow/MultiHopFlowCompilerTest.java | 34 ++++- .../service/modules/flowgraph/DagTest.java | 101 +++++++++++-- .../src/test/resources/flow/multicastFlow.conf | 19 +++ .../multihop/jobTemplates/distcp.template | 1 - 8 files changed, 287 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java index c642708..1b7ce11 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -23,8 +23,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import com.google.common.collect.Lists; import com.typesafe.config.Config; +import lombok.Getter; + import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecExecutor; @@ -38,27 +41,37 @@ import org.apache.gobblin.service.modules.template.FlowTemplate; /** - * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges - * represented as a {@link List} of {@link FlowEdgeContext}s. + * A class that encapsulates a path in the {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. */ public class FlowGraphPath { - private List<FlowEdgeContext> path; + @Getter + private List<List<FlowEdgeContext>> paths; private FlowSpec flowSpec; private Long flowExecutionId; - public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) { - this.path = path; + public FlowGraphPath(FlowSpec flowSpec, Long flowExecutionId) { this.flowSpec = flowSpec; this.flowExecutionId = flowExecutionId; } - public Dag<JobExecutionPlan> asDag() - throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + public void addPath(List<FlowEdgeContext> path) { + if (this.paths == null) { + this.paths = new ArrayList<>(); + } + this.paths.add(path); + } + + public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>()); - Iterator<FlowEdgeContext> pathIterator = path.iterator(); - while (pathIterator.hasNext()) { - Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next()); - flowDag = flowDag.concatenate(flowEdgeDag); + + for(List<FlowEdgeContext> path: paths) { + Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>()); + Iterator<FlowEdgeContext> pathIterator = path.iterator(); + while (pathIterator.hasNext()) { + Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next()); + pathDag = pathDag.concatenate(flowEdgeDag); + } + flowDag = flowDag.merge(pathDag); } return flowDag; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java index 2b4746c..59c831d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java @@ -61,7 +61,7 @@ public class FlowGraphPathFinder { private Config flowConfig; private DataNode srcNode; - private DataNode destNode; + private List<DataNode> destNodes; private DatasetDescriptor srcDatasetDescriptor; private DatasetDescriptor destDatasetDescriptor; @@ -83,12 +83,18 @@ public class FlowGraphPathFinder { //Get src/dest DataNodes from the flow config String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); - String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ""); + + List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); this.srcNode = this.flowGraph.getNode(srcNodeId); Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId); - this.destNode = this.flowGraph.getNode(destNodeId); - Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId); - + for (String destNodeId : destNodeIds) { + DataNode destNode = this.flowGraph.getNode(destNodeId); + Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId); + if (this.destNodes == null) { + this.destNodes = new ArrayList<>(); + } + this.destNodes.add(destNode); + } //Get src/dest dataset descriptors from the flow config Config srcDatasetDescriptorConfig = flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX); @@ -109,70 +115,85 @@ public class FlowGraphPathFinder { } } + public FlowGraphPath findPath() throws PathFinderException { + // Generate flow execution id for this compilation + this.flowExecutionId = System.currentTimeMillis(); + + FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId); + //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the + // flow graph. + // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will + // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers. + synchronized (this.flowGraph) { + for (DataNode destNode : this.destNodes) { + List<FlowEdgeContext> path = findPathBFS(destNode); + if (path != null) { + flowGraphPath.addPath(path); + } else { + //No path to at least one of the destination nodes. + return null; + } + } + } + return flowGraphPath; + } + /** * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are * added first to the queue. This ensures that dataset transformations are always performed closest to the source. * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode. */ - public FlowGraphPath findPath() throws PathFinderException { + private List<FlowEdgeContext> findPathBFS(DataNode destNode) + throws PathFinderException { try { //Initialization of auxiliary data structures used for path computation this.pathMap = new HashMap<>(); - // Generate flow execution id for this compilation - this.flowExecutionId = System.currentTimeMillis(); - - //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the - // flow graph. - // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will - // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers. - synchronized (this.flowGraph) { - //Base condition 1: Source Node or Dest Node is inactive; return null - if (!srcNode.isActive() || !destNode.isActive()) { - log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(), - this.destNode.getId()); - return null; - } + //Base condition 1: Source Node or Dest Node is inactive; return null + if (!srcNode.isActive() || !destNode.isActive()) { + log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", + this.srcNode.getId(), destNode.getId()); + return null; + } - //Base condition 2: Check if we are already at the target. If so, return an empty path. - if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { - return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId); - } + //Base condition 2: Check if we are already at the target. If so, return an empty path. + if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { + return new ArrayList<>(); + } - LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); - edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); - for (FlowEdgeContext flowEdgeContext : edgeQueue) { - this.pathMap.put(flowEdgeContext, flowEdgeContext); - } + LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); + edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); + for (FlowEdgeContext flowEdgeContext : edgeQueue) { + this.pathMap.put(flowEdgeContext, flowEdgeContext); + } - //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges - // to the edge E. For each adjacent edge E', do the following: - // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and - // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the - // edge E'. If yes, add the edge E' to the edge queue. - // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. - while (!edgeQueue.isEmpty()) { - FlowEdgeContext flowEdgeContext = edgeQueue.pop(); - - DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); - DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); - - //Are we done? - if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { - return constructPath(flowEdgeContext); - } + //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges + // to the edge E. For each adjacent edge E', do the following: + // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and + // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the + // edge E'. If yes, add the edge E' to the edge queue. + // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. + while (!edgeQueue.isEmpty()) { + FlowEdgeContext flowEdgeContext = edgeQueue.pop(); + + DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); + DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); + + //Are we done? + if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { + return constructPath(flowEdgeContext); + } - //Expand the currentNode to its adjacent edges and add them to the queue. - List<FlowEdgeContext> nextEdges = - getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); - for (FlowEdgeContext childFlowEdgeContext : nextEdges) { - //Add a pointer from the child edge to the parent edge, if the child edge is not already in the - // queue. - if (!this.pathMap.containsKey(childFlowEdgeContext)) { - edgeQueue.add(childFlowEdgeContext); - this.pathMap.put(childFlowEdgeContext, flowEdgeContext); - } + //Expand the currentNode to its adjacent edges and add them to the queue. + List<FlowEdgeContext> nextEdges = + getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); + for (FlowEdgeContext childFlowEdgeContext : nextEdges) { + //Add a pointer from the child edge to the parent edge, if the child edge is not already in the + // queue. + if (!this.pathMap.containsKey(childFlowEdgeContext)) { + edgeQueue.add(childFlowEdgeContext); + this.pathMap.put(childFlowEdgeContext, flowEdgeContext); } } } @@ -180,7 +201,8 @@ public class FlowGraphPathFinder { return null; } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) { throw new PathFinderException( - "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e); + "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode + .getId(), e); } } @@ -213,7 +235,7 @@ public class FlowGraphPathFinder { boolean foundExecutor = false; //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template. - for (SpecExecutor specExecutor: flowEdge.getExecutors()) { + for (SpecExecutor specExecutor : flowEdge.getExecutors()) { Config mergedConfig = getMergedConfig(flowEdge, specExecutor); List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs = flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig); @@ -226,10 +248,14 @@ public class FlowGraphPathFinder { //If datasets described by the currentDatasetDescriptor is a subset of the datasets described // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g. // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward. - flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor); + flowEdgeContext = + new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, + specExecutor); } else { //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge) - flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor); + flowEdgeContext = + new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, + specExecutor); } if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) { //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible @@ -289,7 +315,7 @@ public class FlowGraphPathFinder { * @throws JobTemplate.TemplateException * @throws URISyntaxException */ - private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext) + private List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { //Backtrace from the last edge using the path map and push each edge into a LIFO data structure. List<FlowEdgeContext> path = new LinkedList<>(); @@ -303,8 +329,7 @@ public class FlowGraphPathFinder { break; } } - FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId); - return flowGraphPath; + return path; } public static class PathFinderException extends Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index a281aa4..83951a0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -136,7 +136,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { log.info(String.format("No path found from source: %s and destination: %s", source, destination)); return new JobExecutionPlanDagFactory().createDag(new ArrayList<>()); } - } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | IOException e) { + } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException e) { Instrumented.markMeter(this.flowCompilationFailedMeter); log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e); return null; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 58bbb81..3606897 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -17,7 +17,6 @@ package org.apache.gobblin.service.modules.flowgraph; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,10 +24,10 @@ import java.util.Map; import com.google.common.collect.Lists; -import org.apache.gobblin.annotation.Alpha; - import lombok.Getter; +import org.apache.gobblin.annotation.Alpha; + /** * An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce @@ -100,7 +99,7 @@ public class Dag<T> { * @param other dag to concatenate to this dag * @return the concatenated dag */ - public Dag<T> concatenate(Dag<T> other) throws IOException { + public Dag<T> concatenate(Dag<T> other) { if (other == null || other.isEmpty()) { return this; } @@ -115,6 +114,36 @@ public class Dag<T> { } this.endNodes = other.endNodes; } + //Append all the entries from the other dag's parentChildMap to this dag's parentChildMap + for (Map.Entry<DagNode, List<DagNode<T>>> entry: other.parentChildMap.entrySet()) { + this.parentChildMap.put(entry.getKey(), entry.getValue()); + } + this.nodes.addAll(other.nodes); + return this; + } + + /** + * Merge the "other" dag to "this" dag and return "this" dag as a forest of the two dags. + * More specifically, the merge() operation takes two dags and returns a disjoint union of the two dags. + * + * @param other dag to merge to this dag + * @return the disjoint union of the two dags + */ + + public Dag<T> merge(Dag<T> other) { + if (other == null || other.isEmpty()) { + return this; + } + if (this.isEmpty()) { + return other; + } + //Append all the entries from the other dag's parentChildMap to this dag's parentChildMap + for (Map.Entry<DagNode, List<DagNode<T>>> entry: other.parentChildMap.entrySet()) { + this.parentChildMap.put(entry.getKey(), entry.getValue()); + } + //Append the startNodes, endNodes and nodes from the other dag to this dag. + this.startNodes.addAll(other.startNodes); + this.endNodes.addAll(other.endNodes); this.nodes.addAll(other.nodes); return this; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index b8bac02..c214b35 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -23,6 +23,7 @@ import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.util.List; import java.util.Properties; import java.util.concurrent.Future; @@ -121,7 +122,6 @@ public class MultiHopFlowCompilerTest { this.flowGraph.addFlowEdge(edge); } } - this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph); } @@ -153,7 +153,6 @@ public class MultiHopFlowCompilerTest { FlowSpec spec = flowSpecBuilder.build(); return spec; } - @Test public void testCompileFlow() throws URISyntaxException, IOException { FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); @@ -370,6 +369,37 @@ public class MultiHopFlowCompilerTest { Assert.assertTrue(jobDag.isEmpty()); } + @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion") + public void testMulticastPath() throws IOException, URISyntaxException { + FlowSpec spec = createFlowSpec("flow/multicastFlow.conf", "LocalFS-1", "HDFS-3,HDFS-4"); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + + Assert.assertEquals(jobDag.getNodes().size(), 4); + Assert.assertEquals(jobDag.getEndNodes().size(), 2); + Assert.assertEquals(jobDag.getStartNodes().size(), 2); + + int i = 1; + //First hop must be from LocalFS to HDFS-1 and HDFS-2 + for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); + Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "file:///"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn0" + i++ + ".grid.linkedin.com:8888/"); + } + + i = 1; + //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively. + for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { + List<Dag.DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode); + Assert.assertEquals(nextNodes.size(), 1); + JobExecutionPlan jobExecutionPlan = nextNodes.get(0).getValue(); + Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn0" + i + ".grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn0" + (i + 2) + ".grid.linkedin.com:8888/"); + i += 1; + } + } + @AfterClass public void tearDown() { } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java index 3fc4fca..688f5de 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java @@ -23,12 +23,16 @@ import java.util.Set; import org.testng.Assert; import org.testng.annotations.Test; -import org.testng.collections.Lists; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j public class DagTest { @Test - public void testInitialize() throws Exception { + public void testInitialize() { Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1"); Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2"); Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3"); @@ -44,7 +48,7 @@ public class DagTest { List<Dag.DagNode<String>> dagNodeList = Lists.newArrayList(dagNode1, dagNode2, dagNode3, dagNode4, dagNode5); Dag<String> dag = new Dag<>(dagNodeList); //Test startNodes and endNodes - Assert.assertEquals(dag.getStartNodes().size(),1); + Assert.assertEquals(dag.getStartNodes().size(), 1); Assert.assertEquals(dag.getStartNodes().get(0).getValue(), "val1"); Assert.assertEquals(dag.getEndNodes().size(), 2); Assert.assertEquals(dag.getEndNodes().get(0).getValue(), "val4"); @@ -52,9 +56,9 @@ public class DagTest { Dag.DagNode startNode = dag.getStartNodes().get(0); - Assert.assertEquals(dag.getChildren(startNode).size(),2); + Assert.assertEquals(dag.getChildren(startNode).size(), 2); Set<String> childSet = new HashSet<>(); - for(Dag.DagNode<String> node: dag.getChildren(startNode)) { + for (Dag.DagNode<String> node: dag.getChildren(startNode)) { childSet.add(node.getValue()); } Assert.assertTrue(childSet.contains("val2")); @@ -66,7 +70,7 @@ public class DagTest { Assert.assertEquals(dag.getChildren(dagNode2).size(), 1); Assert.assertEquals(dag.getChildren(dagNode2).get(0).getValue(), "val4"); - for(Dag.DagNode<String> node: dag.getChildren(dagNode3)) { + for (Dag.DagNode<String> node: dag.getChildren(dagNode3)) { childSet.add(node.getValue()); } Assert.assertTrue(childSet.contains("val4")); @@ -78,7 +82,7 @@ public class DagTest { } @Test - public void testConcatenate() throws Exception { + public void testConcatenate() { Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1"); Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2"); Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3"); @@ -105,7 +109,7 @@ public class DagTest { Dag<String> dagNew = dag1.concatenate(dag2); //Ensure end nodes of first dag are no longer end nodes - for(Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) { + for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) { Assert.assertEquals(dagNew.getParents(dagNode).size(), 2); Set<String> set = new HashSet<>(); set.add(dagNew.getParents(dagNode).get(0).getValue()); @@ -114,7 +118,7 @@ public class DagTest { Assert.assertTrue(set.contains("val5")); } - for(Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5)) { + for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5)) { Assert.assertEquals(dagNew.getChildren(dagNode).size(), 2); Set<String> set = new HashSet<>(); set.add(dagNew.getChildren(dagNode).get(0).getValue()); @@ -123,11 +127,88 @@ public class DagTest { Assert.assertTrue(set.contains("val7")); } + for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) { + List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode); + Assert.assertEquals(nextNodes.size(), 1); + Assert.assertEquals(nextNodes.get(0).getValue(), "val8"); + } + //Test new start and end nodes. - Assert.assertEquals(dagNew.getStartNodes().size(),1); + Assert.assertEquals(dagNew.getStartNodes().size(), 1); Assert.assertEquals(dagNew.getStartNodes().get(0).getValue(), "val1"); Assert.assertEquals(dagNew.getEndNodes().size(), 1); Assert.assertEquals(dagNew.getEndNodes().get(0).getValue(), "val8"); } + + @Test + public void testMerge() { + Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1"); + Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2"); + Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3"); + Dag.DagNode<String> dagNode4 = new Dag.DagNode<>("val4"); + Dag.DagNode<String> dagNode5 = new Dag.DagNode<>("val5"); + + dagNode2.addParentNode(dagNode1); + dagNode3.addParentNode(dagNode1); + dagNode4.addParentNode(dagNode2); + dagNode4.addParentNode(dagNode3); + dagNode5.addParentNode(dagNode3); + + List<Dag.DagNode<String>> dagNodeList = Lists.newArrayList(dagNode1, dagNode2, dagNode3, dagNode4, dagNode5); + Dag<String> dag1 = new Dag<>(dagNodeList); + + Dag.DagNode<String> dagNode6 = new Dag.DagNode<>("val6"); + Dag.DagNode<String> dagNode7 = new Dag.DagNode<>("val7"); + Dag.DagNode<String> dagNode8 = new Dag.DagNode<>("val8"); + dagNode8.addParentNode(dagNode6); + dagNode8.addParentNode(dagNode7); + Dag<String> dag2 = new Dag<>(Lists.newArrayList(dagNode6, dagNode7, dagNode8)); + + //Merge the two dags + Dag<String> dagNew = dag1.merge(dag2); + + //Test the startNodes + Assert.assertEquals(dagNew.getStartNodes().size(), 3); + for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode1, dagNode6, dagNode7)) { + Assert.assertTrue(dagNew.getStartNodes().contains(dagNode)); + Assert.assertNull(dagNew.getParents(dagNode)); + if (dagNode == dagNode1) { + List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode); + Assert.assertEquals(nextNodes.size(), 2); + Assert.assertTrue(nextNodes.contains(dagNode2)); + Assert.assertTrue(nextNodes.contains(dagNode3)); + } else { + Assert.assertEquals(dagNew.getChildren(dagNode).size(), 1); + Assert.assertTrue(dagNew.getChildren(dagNode).contains(dagNode8)); + } + } + + //Test the endNodes + Assert.assertEquals(dagNew.getEndNodes().size(), 3); + for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5, dagNode8)) { + Assert.assertTrue(dagNew.getEndNodes().contains(dagNode)); + Assert.assertNull(dagNew.getChildren(dagNode)); + if (dagNode == dagNode8) { + Assert.assertEquals(dagNew.getParents(dagNode).size(), 2); + Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode6)); + Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode7)); + } else { + Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode3)); + if (dagNode == dagNode4) { + Assert.assertEquals(dagNew.getParents(dagNode).size(), 2); + Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode2)); + } else { + Assert.assertEquals(dagNew.getParents(dagNode).size(), 1); + } + } + } + + //Test the other nodes + Assert.assertEquals(dagNew.getChildren(dagNode2).size(), 1); + Assert.assertTrue(dagNew.getChildren(dagNode2).contains(dagNode4)); + Assert.assertEquals(dagNew.getChildren(dagNode3).size(), 2); + Assert.assertTrue(dagNew.getChildren(dagNode3).contains(dagNode4)); + Assert.assertTrue(dagNew.getChildren(dagNode3).contains(dagNode5)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/resources/flow/multicastFlow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/multicastFlow.conf b/gobblin-service/src/test/resources/flow/multicastFlow.conf new file mode 100644 index 0000000..1b92343 --- /dev/null +++ b/gobblin-service/src/test/resources/flow/multicastFlow.conf @@ -0,0 +1,19 @@ +team.name=testTeam +dataset.name=testDataset +user.to.proxy=testUser + +#Input dataset - uncompressed and unencrypted +gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.input.dataset.descriptor.platform=hdfs +gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.input.dataset.descriptor.format=avro +gobblin.flow.input.dataset.descriptor.codec=NONE +gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE + +#Output dataset - same as input dataset +gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class} +gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform} +gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path} +gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format} +gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec} +gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template index 844dc92..c8a557e 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template +++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template @@ -26,7 +26,6 @@ target.filebased.fs.uri=${destination.data.node.fs.uri} writer.fs.uri=${target.filebased.fs.uri} work.dir=/tmp/${user.to.proxy} -writer.user.to.proxy=${adls.user.to.proxy} # ==================================================================== # Distcp configurations
