Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ef26d287d -> cd5222775


[GOBBLIN-552] Add multicast option to the MultiHopFlowCompiler.[]

Closes #2413 from sv2000/multicast


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cd522277
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cd522277
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cd522277

Branch: refs/heads/master
Commit: cd5222775cb8dcb3022b65a77cae1d3225a0070d
Parents: ef26d28
Author: suvasude <[email protected]>
Authored: Sun Aug 12 22:01:59 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Sun Aug 12 22:01:59 2018 -0700

----------------------------------------------------------------------
 .../service/modules/flow/FlowGraphPath.java     |  35 +++--
 .../modules/flow/FlowGraphPathFinder.java       | 149 +++++++++++--------
 .../modules/flow/MultiHopFlowCompiler.java      |   2 +-
 .../gobblin/service/modules/flowgraph/Dag.java  |  37 ++++-
 .../modules/flow/MultiHopFlowCompilerTest.java  |  34 ++++-
 .../service/modules/flowgraph/DagTest.java      | 101 +++++++++++--
 .../src/test/resources/flow/multicastFlow.conf  |  19 +++
 .../multihop/jobTemplates/distcp.template       |   1 -
 8 files changed, 287 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index c642708..1b7ce11 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -23,8 +23,11 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -38,27 +41,37 @@ import 
org.apache.gobblin.service.modules.template.FlowTemplate;
 
 
 /**
- * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a 
sequence of edges
- * represented as a {@link List} of {@link FlowEdgeContext}s.
+ * A class that encapsulates a path in the {@link 
org.apache.gobblin.service.modules.flowgraph.FlowGraph}.
  */
 public class FlowGraphPath {
-  private List<FlowEdgeContext> path;
+  @Getter
+  private List<List<FlowEdgeContext>> paths;
   private FlowSpec flowSpec;
   private Long flowExecutionId;
 
-  public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long 
flowExecutionId) {
-    this.path = path;
+  public FlowGraphPath(FlowSpec flowSpec, Long flowExecutionId) {
     this.flowSpec = flowSpec;
     this.flowExecutionId = flowExecutionId;
   }
 
-  public Dag<JobExecutionPlan> asDag()
-      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+  public void addPath(List<FlowEdgeContext> path) {
+    if (this.paths == null) {
+      this.paths = new ArrayList<>();
+    }
+    this.paths.add(path);
+  }
+
+  public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
     Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
-    Iterator<FlowEdgeContext> pathIterator = path.iterator();
-    while (pathIterator.hasNext()) {
-      Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next());
-      flowDag = flowDag.concatenate(flowEdgeDag);
+
+    for(List<FlowEdgeContext> path: paths) {
+      Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>());
+      Iterator<FlowEdgeContext> pathIterator = path.iterator();
+      while (pathIterator.hasNext()) {
+        Dag<JobExecutionPlan> flowEdgeDag = 
convertHopToDag(pathIterator.next());
+        pathDag = pathDag.concatenate(flowEdgeDag);
+      }
+      flowDag = flowDag.merge(pathDag);
     }
     return flowDag;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
index 2b4746c..59c831d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
@@ -61,7 +61,7 @@ public class FlowGraphPathFinder {
   private Config flowConfig;
 
   private DataNode srcNode;
-  private DataNode destNode;
+  private List<DataNode> destNodes;
 
   private DatasetDescriptor srcDatasetDescriptor;
   private DatasetDescriptor destDatasetDescriptor;
@@ -83,12 +83,18 @@ public class FlowGraphPathFinder {
 
     //Get src/dest DataNodes from the flow config
     String srcNodeId = ConfigUtils.getString(flowConfig, 
ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
-    String destNodeId = ConfigUtils.getString(flowConfig, 
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+
+    List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, 
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     this.srcNode = this.flowGraph.getNode(srcNodeId);
     Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a 
node with id " + srcNodeId);
-    this.destNode = this.flowGraph.getNode(destNodeId);
-    Preconditions.checkArgument(destNode != null, "Flowgraph does not have a 
node with id " + destNodeId);
-
+    for (String destNodeId : destNodeIds) {
+      DataNode destNode = this.flowGraph.getNode(destNodeId);
+      Preconditions.checkArgument(destNode != null, "Flowgraph does not have a 
node with id " + destNodeId);
+      if (this.destNodes == null) {
+        this.destNodes = new ArrayList<>();
+      }
+      this.destNodes.add(destNode);
+    }
     //Get src/dest dataset descriptors from the flow config
     Config srcDatasetDescriptorConfig =
         
flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
@@ -109,70 +115,85 @@ public class FlowGraphPathFinder {
     }
   }
 
+  public FlowGraphPath findPath() throws PathFinderException {
+    // Generate flow execution id for this compilation
+    this.flowExecutionId = System.currentTimeMillis();
+
+    FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
+    //Path computation must be thread-safe to guarantee read consistency. In 
other words, we prevent concurrent read/write access to the
+    // flow graph.
+    // TODO: we can easily improve the performance by using a 
ReentrantReadWriteLock associated with the FlowGraph. This will
+    // allow multiple concurrent readers to not be blocked on each other, as 
long as there are no writers.
+    synchronized (this.flowGraph) {
+      for (DataNode destNode : this.destNodes) {
+        List<FlowEdgeContext> path = findPathBFS(destNode);
+        if (path != null) {
+          flowGraphPath.addPath(path);
+        } else {
+          //No path to at least one of the destination nodes.
+          return null;
+        }
+      }
+    }
+    return flowGraphPath;
+  }
+
   /**
    * A simple path finding algorithm based on Breadth-First Search. At every 
step the algorithm adds the adjacent {@link FlowEdge}s
    * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} 
matches the destDatasetDescriptor are
    * added first to the queue. This ensures that dataset transformations are 
always performed closest to the source.
    * @return a path of {@link FlowEdgeContext}s starting at the srcNode and 
ending at the destNode.
    */
-  public FlowGraphPath findPath() throws PathFinderException {
+  private List<FlowEdgeContext> findPathBFS(DataNode destNode)
+      throws PathFinderException {
     try {
       //Initialization of auxiliary data structures used for path computation
       this.pathMap = new HashMap<>();
 
-      // Generate flow execution id for this compilation
-      this.flowExecutionId = System.currentTimeMillis();
-
-      //Path computation must be thread-safe to guarantee read consistency. In 
other words, we prevent concurrent read/write access to the
-      // flow graph.
-      // TODO: we can easily improve the performance by using a 
ReentrantReadWriteLock associated with the FlowGraph. This will
-      // allow multiple concurrent readers to not be blocked on each other, as 
long as there are no writers.
-      synchronized (this.flowGraph) {
-        //Base condition 1: Source Node or Dest Node is inactive; return null
-        if (!srcNode.isActive() || !destNode.isActive()) {
-          log.warn("Either source node {} or destination node {} is inactive; 
skipping path computation.", this.srcNode.getId(),
-              this.destNode.getId());
-          return null;
-        }
+      //Base condition 1: Source Node or Dest Node is inactive; return null
+      if (!srcNode.isActive() || !destNode.isActive()) {
+        log.warn("Either source node {} or destination node {} is inactive; 
skipping path computation.",
+            this.srcNode.getId(), destNode.getId());
+        return null;
+      }
 
-        //Base condition 2: Check if we are already at the target. If so, 
return an empty path.
-        if ((srcNode.equals(destNode)) && 
destDatasetDescriptor.contains(srcDatasetDescriptor)) {
-          return new FlowGraphPath(new ArrayList<>(), flowSpec, 
flowExecutionId);
-        }
+      //Base condition 2: Check if we are already at the target. If so, return 
an empty path.
+      if ((srcNode.equals(destNode)) && 
destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+        return new ArrayList<>();
+      }
 
-        LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
-        edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, 
destDatasetDescriptor));
-        for (FlowEdgeContext flowEdgeContext : edgeQueue) {
-          this.pathMap.put(flowEdgeContext, flowEdgeContext);
-        }
+      LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+      edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, 
destDatasetDescriptor));
+      for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+        this.pathMap.put(flowEdgeContext, flowEdgeContext);
+      }
 
-        //At every step, pop an edge E from the edge queue. Mark the edge E as 
visited. Generate the list of adjacent edges
-        // to the edge E. For each adjacent edge E', do the following:
-        //    1. check if the FlowTemplate described by E' is resolvable using 
the flowConfig, and
-        //    2. check if the output dataset descriptor of edge E is 
compatible with the input dataset descriptor of the
-        //       edge E'. If yes, add the edge E' to the edge queue.
-        // If the edge E' satisfies 1 and 2, add it to the edge queue for 
further consideration.
-        while (!edgeQueue.isEmpty()) {
-          FlowEdgeContext flowEdgeContext = edgeQueue.pop();
-
-          DataNode currentNode = 
this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
-          DatasetDescriptor currentOutputDatasetDescriptor = 
flowEdgeContext.getOutputDatasetDescriptor();
-
-          //Are we done?
-          if (isPathFound(currentNode, destNode, 
currentOutputDatasetDescriptor, destDatasetDescriptor)) {
-            return constructPath(flowEdgeContext);
-          }
+      //At every step, pop an edge E from the edge queue. Mark the edge E as 
visited. Generate the list of adjacent edges
+      // to the edge E. For each adjacent edge E', do the following:
+      //    1. check if the FlowTemplate described by E' is resolvable using 
the flowConfig, and
+      //    2. check if the output dataset descriptor of edge E is compatible 
with the input dataset descriptor of the
+      //       edge E'. If yes, add the edge E' to the edge queue.
+      // If the edge E' satisfies 1 and 2, add it to the edge queue for 
further consideration.
+      while (!edgeQueue.isEmpty()) {
+        FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+        DataNode currentNode = 
this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+        DatasetDescriptor currentOutputDatasetDescriptor = 
flowEdgeContext.getOutputDatasetDescriptor();
+
+        //Are we done?
+        if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor)) {
+          return constructPath(flowEdgeContext);
+        }
 
-          //Expand the currentNode to its adjacent edges and add them to the 
queue.
-          List<FlowEdgeContext> nextEdges =
-              getNextEdges(currentNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor);
-          for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
-            //Add a pointer from the child edge to the parent edge, if the 
child edge is not already in the
-            // queue.
-            if (!this.pathMap.containsKey(childFlowEdgeContext)) {
-              edgeQueue.add(childFlowEdgeContext);
-              this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
-            }
+        //Expand the currentNode to its adjacent edges and add them to the 
queue.
+        List<FlowEdgeContext> nextEdges =
+            getNextEdges(currentNode, currentOutputDatasetDescriptor, 
destDatasetDescriptor);
+        for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+          //Add a pointer from the child edge to the parent edge, if the child 
edge is not already in the
+          // queue.
+          if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+            edgeQueue.add(childFlowEdgeContext);
+            this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
           }
         }
       }
@@ -180,7 +201,8 @@ public class FlowGraphPathFinder {
       return null;
     } catch (SpecNotFoundException | JobTemplate.TemplateException | 
IOException | URISyntaxException e) {
       throw new PathFinderException(
-          "Exception encountered when computing path from src: " + 
this.srcNode.getId() + " to dest: " + this.destNode.getId(), e);
+          "Exception encountered when computing path from src: " + 
this.srcNode.getId() + " to dest: " + destNode
+              .getId(), e);
     }
   }
 
@@ -213,7 +235,7 @@ public class FlowGraphPathFinder {
 
         boolean foundExecutor = false;
         //Iterate over all executors for this edge. Find the first one that 
resolves the underlying flow template.
-        for (SpecExecutor specExecutor: flowEdge.getExecutors()) {
+        for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
           Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
           List<Pair<DatasetDescriptor, DatasetDescriptor>> 
datasetDescriptorPairs =
               
flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
@@ -226,10 +248,14 @@ public class FlowGraphPathFinder {
                 //If datasets described by the currentDatasetDescriptor is a 
subset of the datasets described
                 // by the outputDatasetDescriptor (i.e. 
currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
                 // as in the case of a "distcp" edge), we propagate the more 
"specific" dataset descriptor forward.
-                flowEdgeContext = new FlowEdgeContext(flowEdge, 
currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor);
+                flowEdgeContext =
+                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, 
currentDatasetDescriptor, mergedConfig,
+                        specExecutor);
               } else {
                 //outputDatasetDescriptor is more specific (e.g. if it is a 
dataset transformation edge)
-                flowEdgeContext = new FlowEdgeContext(flowEdge, 
currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor);
+                flowEdgeContext =
+                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, 
outputDatasetDescriptor, mergedConfig,
+                        specExecutor);
               }
               if 
(destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig()))
 {
                 //Add to the front of the edge list if platform-independent 
properties of the output descriptor is compatible
@@ -289,7 +315,7 @@ public class FlowGraphPathFinder {
    * @throws JobTemplate.TemplateException
    * @throws URISyntaxException
    */
-  private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext)
+  private List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext)
       throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
     //Backtrace from the last edge using the path map and push each edge into 
a LIFO data structure.
     List<FlowEdgeContext> path = new LinkedList<>();
@@ -303,8 +329,7 @@ public class FlowGraphPathFinder {
         break;
       }
     }
-    FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, 
flowExecutionId);
-    return flowGraphPath;
+    return path;
   }
 
   public static class PathFinderException extends Exception {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index a281aa4..83951a0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -136,7 +136,7 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
         log.info(String.format("No path found from source: %s and destination: 
%s", source, destination));
         return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
       }
-    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | 
JobTemplate.TemplateException | URISyntaxException | IOException e) {
+    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | 
JobTemplate.TemplateException | URISyntaxException e) {
       Instrumented.markMeter(this.flowCompilationFailedMeter);
       log.error(String.format("Exception encountered while compiling flow for 
source: %s and destination: %s", source, destination), e);
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 58bbb81..3606897 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,10 +24,10 @@ import java.util.Map;
 
 import com.google.common.collect.Lists;
 
-import org.apache.gobblin.annotation.Alpha;
-
 import lombok.Getter;
 
+import org.apache.gobblin.annotation.Alpha;
+
 
 /**
  * An implementation of Dag. Assumes that nodes have unique values. Nodes with 
duplicate values will produce
@@ -100,7 +99,7 @@ public class Dag<T> {
    * @param other dag to concatenate to this dag
    * @return the concatenated dag
    */
-  public Dag<T> concatenate(Dag<T> other) throws IOException {
+  public Dag<T> concatenate(Dag<T> other) {
     if (other == null || other.isEmpty()) {
       return this;
     }
@@ -115,6 +114,36 @@ public class Dag<T> {
       }
       this.endNodes = other.endNodes;
     }
+    //Append all the entries from the other dag's parentChildMap to this dag's 
parentChildMap
+    for (Map.Entry<DagNode, List<DagNode<T>>> entry: 
other.parentChildMap.entrySet()) {
+      this.parentChildMap.put(entry.getKey(), entry.getValue());
+    }
+    this.nodes.addAll(other.nodes);
+    return this;
+  }
+
+  /**
+   * Merge the "other" dag to "this" dag and return "this" dag as a forest of 
the two dags.
+   * More specifically, the merge() operation takes two dags and returns a 
disjoint union of the two dags.
+   *
+   * @param other dag to merge to this dag
+   * @return the disjoint union of the two dags
+   */
+
+  public Dag<T> merge(Dag<T> other) {
+    if (other == null || other.isEmpty()) {
+      return this;
+    }
+    if (this.isEmpty()) {
+      return other;
+    }
+    //Append all the entries from the other dag's parentChildMap to this dag's 
parentChildMap
+    for (Map.Entry<DagNode, List<DagNode<T>>> entry: 
other.parentChildMap.entrySet()) {
+      this.parentChildMap.put(entry.getKey(), entry.getValue());
+    }
+    //Append the startNodes, endNodes and nodes from the other dag to this dag.
+    this.startNodes.addAll(other.startNodes);
+    this.endNodes.addAll(other.endNodes);
     this.nodes.addAll(other.nodes);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index b8bac02..c214b35 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -23,6 +23,7 @@ import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
@@ -121,7 +122,6 @@ public class MultiHopFlowCompilerTest {
         this.flowGraph.addFlowEdge(edge);
       }
     }
-
     this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
   }
 
@@ -153,7 +153,6 @@ public class MultiHopFlowCompilerTest {
     FlowSpec spec = flowSpecBuilder.build();
     return spec;
   }
-
   @Test
   public void testCompileFlow() throws URISyntaxException, IOException {
     FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
@@ -370,6 +369,37 @@ public class MultiHopFlowCompilerTest {
     Assert.assertTrue(jobDag.isEmpty());
   }
 
+  @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
+  public void testMulticastPath() throws IOException, URISyntaxException {
+    FlowSpec spec = createFlowSpec("flow/multicastFlow.conf", "LocalFS-1", 
"HDFS-3,HDFS-4");
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 2);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 2);
+
+    int i = 1;
+    //First hop must be from LocalFS to HDFS-1 and HDFS-2
+    for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
+      JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+      Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+      Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"file:///");
+      Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn0" + i++ + ".grid.linkedin.com:8888/");
+    }
+
+    i = 1;
+    //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
+      List<Dag.DagNode<JobExecutionPlan>> nextNodes = 
jobDag.getChildren(dagNode);
+      Assert.assertEquals(nextNodes.size(), 1);
+      JobExecutionPlan jobExecutionPlan = nextNodes.get(0).getValue();
+      Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+      Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"hdfs://hadoopnn0" + i + ".grid.linkedin.com:8888/");
+      Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn0" + (i + 2) + ".grid.linkedin.com:8888/");
+      i += 1;
+    }
+  }
+
   @AfterClass
   public void tearDown() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
index 3fc4fca..688f5de 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
@@ -23,12 +23,16 @@ import java.util.Set;
 
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import org.testng.collections.Lists;
 
+import com.google.common.collect.Lists;
 
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
 public class DagTest {
   @Test
-  public void testInitialize() throws Exception {
+  public void testInitialize() {
     Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1");
     Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2");
     Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3");
@@ -44,7 +48,7 @@ public class DagTest {
     List<Dag.DagNode<String>> dagNodeList = Lists.newArrayList(dagNode1, 
dagNode2, dagNode3, dagNode4, dagNode5);
     Dag<String> dag = new Dag<>(dagNodeList);
     //Test startNodes and endNodes
-    Assert.assertEquals(dag.getStartNodes().size(),1);
+    Assert.assertEquals(dag.getStartNodes().size(), 1);
     Assert.assertEquals(dag.getStartNodes().get(0).getValue(), "val1");
     Assert.assertEquals(dag.getEndNodes().size(), 2);
     Assert.assertEquals(dag.getEndNodes().get(0).getValue(), "val4");
@@ -52,9 +56,9 @@ public class DagTest {
 
 
     Dag.DagNode startNode = dag.getStartNodes().get(0);
-    Assert.assertEquals(dag.getChildren(startNode).size(),2);
+    Assert.assertEquals(dag.getChildren(startNode).size(), 2);
     Set<String> childSet = new HashSet<>();
-    for(Dag.DagNode<String> node: dag.getChildren(startNode)) {
+    for (Dag.DagNode<String> node: dag.getChildren(startNode)) {
       childSet.add(node.getValue());
     }
     Assert.assertTrue(childSet.contains("val2"));
@@ -66,7 +70,7 @@ public class DagTest {
     Assert.assertEquals(dag.getChildren(dagNode2).size(), 1);
     Assert.assertEquals(dag.getChildren(dagNode2).get(0).getValue(), "val4");
 
-    for(Dag.DagNode<String> node: dag.getChildren(dagNode3)) {
+    for (Dag.DagNode<String> node: dag.getChildren(dagNode3)) {
       childSet.add(node.getValue());
     }
     Assert.assertTrue(childSet.contains("val4"));
@@ -78,7 +82,7 @@ public class DagTest {
   }
 
   @Test
-  public void testConcatenate() throws Exception {
+  public void testConcatenate() {
     Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1");
     Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2");
     Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3");
@@ -105,7 +109,7 @@ public class DagTest {
     Dag<String> dagNew = dag1.concatenate(dag2);
 
     //Ensure end nodes of first dag are no longer end nodes
-    for(Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) {
+    for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) {
       Assert.assertEquals(dagNew.getParents(dagNode).size(), 2);
       Set<String> set = new HashSet<>();
       set.add(dagNew.getParents(dagNode).get(0).getValue());
@@ -114,7 +118,7 @@ public class DagTest {
       Assert.assertTrue(set.contains("val5"));
     }
 
-    for(Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5)) {
+    for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5)) {
       Assert.assertEquals(dagNew.getChildren(dagNode).size(), 2);
       Set<String> set = new HashSet<>();
       set.add(dagNew.getChildren(dagNode).get(0).getValue());
@@ -123,11 +127,88 @@ public class DagTest {
       Assert.assertTrue(set.contains("val7"));
     }
 
+    for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode6, dagNode7)) {
+      List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode);
+      Assert.assertEquals(nextNodes.size(), 1);
+      Assert.assertEquals(nextNodes.get(0).getValue(), "val8");
+    }
+
     //Test new start and end nodes.
-    Assert.assertEquals(dagNew.getStartNodes().size(),1);
+    Assert.assertEquals(dagNew.getStartNodes().size(), 1);
     Assert.assertEquals(dagNew.getStartNodes().get(0).getValue(), "val1");
 
     Assert.assertEquals(dagNew.getEndNodes().size(), 1);
     Assert.assertEquals(dagNew.getEndNodes().get(0).getValue(), "val8");
   }
+
+  @Test
+  public void testMerge() {
+    Dag.DagNode<String> dagNode1 = new Dag.DagNode<>("val1");
+    Dag.DagNode<String> dagNode2 = new Dag.DagNode<>("val2");
+    Dag.DagNode<String> dagNode3 = new Dag.DagNode<>("val3");
+    Dag.DagNode<String> dagNode4 = new Dag.DagNode<>("val4");
+    Dag.DagNode<String> dagNode5 = new Dag.DagNode<>("val5");
+
+    dagNode2.addParentNode(dagNode1);
+    dagNode3.addParentNode(dagNode1);
+    dagNode4.addParentNode(dagNode2);
+    dagNode4.addParentNode(dagNode3);
+    dagNode5.addParentNode(dagNode3);
+
+    List<Dag.DagNode<String>> dagNodeList = Lists.newArrayList(dagNode1, 
dagNode2, dagNode3, dagNode4, dagNode5);
+    Dag<String> dag1 = new Dag<>(dagNodeList);
+
+    Dag.DagNode<String> dagNode6 = new Dag.DagNode<>("val6");
+    Dag.DagNode<String> dagNode7 = new Dag.DagNode<>("val7");
+    Dag.DagNode<String> dagNode8 = new Dag.DagNode<>("val8");
+    dagNode8.addParentNode(dagNode6);
+    dagNode8.addParentNode(dagNode7);
+    Dag<String> dag2 = new Dag<>(Lists.newArrayList(dagNode6, dagNode7, 
dagNode8));
+
+    //Merge the two dags
+    Dag<String> dagNew = dag1.merge(dag2);
+
+    //Test the startNodes
+    Assert.assertEquals(dagNew.getStartNodes().size(), 3);
+    for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode1, dagNode6, 
dagNode7)) {
+      Assert.assertTrue(dagNew.getStartNodes().contains(dagNode));
+      Assert.assertNull(dagNew.getParents(dagNode));
+      if (dagNode == dagNode1) {
+        List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode);
+        Assert.assertEquals(nextNodes.size(), 2);
+        Assert.assertTrue(nextNodes.contains(dagNode2));
+        Assert.assertTrue(nextNodes.contains(dagNode3));
+      } else {
+        Assert.assertEquals(dagNew.getChildren(dagNode).size(), 1);
+        Assert.assertTrue(dagNew.getChildren(dagNode).contains(dagNode8));
+      }
+    }
+
+    //Test the endNodes
+    Assert.assertEquals(dagNew.getEndNodes().size(), 3);
+    for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5, 
dagNode8)) {
+      Assert.assertTrue(dagNew.getEndNodes().contains(dagNode));
+      Assert.assertNull(dagNew.getChildren(dagNode));
+      if (dagNode == dagNode8) {
+        Assert.assertEquals(dagNew.getParents(dagNode).size(), 2);
+        Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode6));
+        Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode7));
+      } else {
+        Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode3));
+        if (dagNode == dagNode4) {
+          Assert.assertEquals(dagNew.getParents(dagNode).size(), 2);
+          Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode2));
+        } else {
+          Assert.assertEquals(dagNew.getParents(dagNode).size(), 1);
+        }
+      }
+    }
+
+    //Test the other nodes
+    Assert.assertEquals(dagNew.getChildren(dagNode2).size(), 1);
+    Assert.assertTrue(dagNew.getChildren(dagNode2).contains(dagNode4));
+    Assert.assertEquals(dagNew.getChildren(dagNode3).size(), 2);
+    Assert.assertTrue(dagNew.getChildren(dagNode3).contains(dagNode4));
+    Assert.assertTrue(dagNew.getChildren(dagNode3).contains(dagNode5));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/resources/flow/multicastFlow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/multicastFlow.conf 
b/gobblin-service/src/test/resources/flow/multicastFlow.conf
new file mode 100644
index 0000000..1b92343
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/multicastFlow.conf
@@ -0,0 +1,19 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.output.dataset.descriptor.format=${gobblin.flow.input.dataset.descriptor.format}
+gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec}
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd522277/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
index 844dc92..c8a557e 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -26,7 +26,6 @@ target.filebased.fs.uri=${destination.data.node.fs.uri}
 writer.fs.uri=${target.filebased.fs.uri}
 
 work.dir=/tmp/${user.to.proxy}
-writer.user.to.proxy=${adls.user.to.proxy}
 
 # ====================================================================
 # Distcp configurations

Reply via email to