MapReduceRunner: add Graph and its visitors.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/092380cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/092380cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/092380cf

Branch: refs/heads/mr-runner
Commit: 092380cf87ada9d4a2b5faa7a42dff7005c44f17
Parents: 9fffd55
Author: Pei He <[email protected]>
Authored: Tue Jul 11 10:45:11 2017 +0800
Committer: Pei He <[email protected]>
Committed: Thu Aug 31 14:13:46 2017 +0800

----------------------------------------------------------------------
 .../runners/mapreduce/translation/Graph.java    | 190 +++++++++++++++++++
 .../mapreduce/translation/GraphConverter.java   |  40 ++++
 .../mapreduce/translation/GraphPlanner.java     |  99 ++++++++++
 .../mapreduce/translation/GraphVisitor.java     |  11 ++
 .../MapReducePipelineTranslator.java            |  11 --
 5 files changed, 340 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
new file mode 100644
index 0000000..a9831bd
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -0,0 +1,190 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Created by peihe on 06/07/2017.
+ */
+public class Graph {
+
+  private final Map<PTransform, Vertex> vertices;
+  private final Map<HeadTail, Edge> edges;
+  private final Set<Vertex> leafVertices;
+
+  public Graph() {
+    this.vertices = Maps.newHashMap();
+    this.edges = Maps.newHashMap();
+    this.leafVertices = Sets.newHashSet();
+  }
+
+  public Vertex addVertex(PTransform<?, ?> transform) {
+    checkState(!vertices.containsKey(transform));
+    Vertex v = new Vertex(transform);
+    vertices.put(transform, v);
+    leafVertices.add(v);
+    return v;
+  }
+
+  public Edge addEdge(Vertex head, Vertex tail) {
+    HeadTail headTail = HeadTail.of(head, tail);
+    checkState(!edges.containsKey(headTail));
+    Edge e = new Edge(headTail);
+    edges.put(headTail, e);
+    head.addOutgoing(e);
+    tail.addIncoming(e);
+    leafVertices.remove(head);
+    return e;
+  }
+
+  public Vertex getVertex(PTransform<?, ?> transform) {
+    return vertices.get(transform);
+  }
+
+  public Edge getEdge(Vertex head, Vertex tail) {
+    return edges.get(HeadTail.of(head, tail));
+  }
+
+  public Set<Vertex> getLeafVertices() {
+    return leafVertices;
+  }
+
+  public void accept(GraphVisitor visitor) {
+    for (Vertex v : leafVertices) {
+      v.accept(visitor);
+    }
+  }
+
+  //TODO: add equals, hashCode, toString for following classses.
+
+  public static class Vertex {
+    private final PTransform<?, ?> transform;
+    private final Set<Edge> incoming;
+    private final Set<Edge> outgoing;
+
+    public Vertex(PTransform transform) {
+      this.transform = checkNotNull(transform, "transform");
+      this.incoming = Sets.newHashSet();
+      this.outgoing = Sets.newHashSet();
+    }
+
+    public PTransform<?, ?> getTransform() {
+      return transform;
+    }
+
+    public Set<Edge> getIncoming() {
+      return incoming;
+    }
+
+    public Set<Edge> getOutgoing() {
+      return outgoing;
+    }
+
+    public boolean isSource() {
+      return transform instanceof Read.Bounded || transform instanceof 
Read.Unbounded;
+    }
+
+    public boolean isGroupByKey() {
+      return transform instanceof GroupByKey;
+    }
+
+    public void addIncoming(Edge edge) {
+      incoming.add(edge);
+    }
+
+    public void addOutgoing(Edge edge) {
+      outgoing.add(edge);
+    }
+
+    public void accept(GraphVisitor visitor) {
+      if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput) {
+        visitor.visitParDo(this);
+      } else if (transform instanceof GroupByKey) {
+        visitor.visitGroupByKey(this);
+      } else if (transform instanceof Read.Bounded) {
+        visitor.visitRead(this);
+      } else if (transform instanceof Flatten.PCollections
+          || transform instanceof Flatten.Iterables) {
+        visitor.visitFlatten(this);
+      } else {
+        throw new RuntimeException("Unexpected transform type: " + 
transform.getClass());
+      }
+    }
+  }
+
+  public static class Edge {
+    private final HeadTail headTail;
+    private final Set<NodePath> paths;
+
+    public static Edge of(Vertex head, Vertex tail) {
+      return of(HeadTail.of(head, tail));
+    }
+
+    public static Edge of(HeadTail headTail) {
+      return new Edge(headTail);
+    }
+
+    private Edge(HeadTail headTail) {
+      this.headTail = checkNotNull(headTail, "headTail");
+      this.paths = Sets.newHashSet();
+    }
+
+    public Vertex getHead() {
+      return headTail.getHead();
+    }
+
+    public Vertex getTail() {
+      return headTail.getTail();
+    }
+
+    public Set<NodePath> getPaths() {
+      return paths;
+    }
+
+    public void addPath(NodePath path) {
+      paths.add(checkNotNull(path, "path"));
+    }
+  }
+
+  public static class NodePath {
+    private final LinkedList<PTransform<?, ?>> path;
+
+    public NodePath() {
+      this.path = new LinkedList<>();
+    }
+
+    public NodePath(NodePath nodePath) {
+      this.path = new LinkedList<>(nodePath.path);
+    }
+
+    public void addFirst(PTransform<?, ?> transform) {
+      path.addFirst(transform);
+    }
+
+    public void addLast(PTransform<?, ?> transform) {
+      path.addLast(transform);
+    }
+  }
+
+  @AutoValue
+  public abstract static class HeadTail {
+    abstract Vertex getHead();
+    abstract Vertex getTail();
+
+    public static HeadTail of(Vertex head, Vertex tail) {
+      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_HeadTail(head, 
tail);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
new file mode 100644
index 0000000..306e58e
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -0,0 +1,40 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.mapreduce.MapReduceRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Pipeline translator for {@link MapReduceRunner}.
+ */
+public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
+
+  private final Map<PValue, Graph.Vertex> outputToProducer;
+  private final Graph graph;
+
+  public GraphConverter() {
+    this.outputToProducer = Maps.newHashMap();
+    this.graph = new Graph();
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    Graph.Vertex v = new Graph.Vertex(node.getTransform());
+
+    for (PValue input : node.getInputs().values()) {
+      if (outputToProducer.containsKey(input)) {
+        Graph.Vertex producer = outputToProducer.get(input);
+        graph.addEdge(producer, v);
+      }
+    }
+
+    for (PValue output : node.getOutputs().values()) {
+      outputToProducer.put(output, v);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
new file mode 100644
index 0000000..d4fa2d9
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -0,0 +1,99 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Created by peihe on 06/07/2017.
+ */
+public class GraphPlanner {
+
+  public Graph plan(Graph initGraph) {
+    FusionVisitor fusionVisitor = new FusionVisitor();
+    initGraph.accept(fusionVisitor);
+    return fusionVisitor.getFusedGraph();
+  }
+
+  private class FusionVisitor implements GraphVisitor {
+
+    private Graph fusedGraph;
+    private Graph.Vertex workingVertex;
+    private Graph.NodePath workingPath;
+
+    FusionVisitor() {
+      fusedGraph = new Graph();
+      workingVertex = null;
+      workingPath = null;
+    }
+
+    @Override
+    public void visitRead(Graph.Vertex read) {
+      if (workingVertex == null) {
+        // drop if read is leaf vertex.
+        return;
+      }
+      Graph.Vertex v = fusedGraph.addVertex(read.getTransform());
+      workingPath.addFirst(read.getTransform());
+      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
+      edge.addPath(workingPath);
+    }
+
+    @Override
+    public void visitParDo(Graph.Vertex parDo) {
+      checkArgument(
+          parDo.getTransform().getAdditionalInputs().isEmpty(),
+          "Side inputs are not supported.");
+      if (workingVertex == null) {
+        // Leaf vertex
+        workingVertex = fusedGraph.addVertex(parDo.getTransform());
+        workingPath = new Graph.NodePath();
+      } else {
+        workingPath.addFirst(parDo.getTransform());
+      }
+      checkArgument(
+          parDo.getIncoming().size() == 1,
+          "Side inputs are not supported.");
+      processParent(parDo.getIncoming().iterator().next().getHead());
+    }
+
+    @Override
+    public void visitFlatten(Graph.Vertex flatten) {
+      if (workingVertex == null) {
+        return;
+      }
+      Graph.NodePath basePath = workingPath;
+      Graph.Vertex baseVertex = workingVertex;
+      for (Graph.Edge e : flatten.getIncoming()) {
+        workingPath = new Graph.NodePath(basePath);
+        workingVertex = baseVertex;
+        processParent(e.getHead());
+      }
+    }
+
+    @Override
+    public void visitGroupByKey(Graph.Vertex groupByKey) {
+      if (workingVertex == null) {
+        return;
+      }
+      Graph.Vertex v = fusedGraph.addVertex(groupByKey.getTransform());
+      workingPath.addFirst(groupByKey.getTransform());
+      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
+      edge.addPath(workingPath);
+    }
+
+    public Graph getFusedGraph() {
+      return fusedGraph;
+    }
+
+    private void processParent(Graph.Vertex parent) {
+      Graph.Vertex v = fusedGraph.getVertex(parent.getTransform());
+      if (v == null) {
+        parent.accept(this);
+      } else {
+        // TODO: parent is consumed more than once.
+        // It is duplicated in multiple outgoing path. Figure out the impact.
+        workingPath.addFirst(parent.getTransform());
+        fusedGraph.getEdge(v, workingVertex).addPath(workingPath);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
new file mode 100644
index 0000000..fe4a76f
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
@@ -0,0 +1,11 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+/**
+ * Created by peihe on 06/07/2017.
+ */
+public interface GraphVisitor {
+  void visitRead(Graph.Vertex read);
+  void visitParDo(Graph.Vertex parDo);
+  void visitFlatten(Graph.Vertex flatten);
+  void visitGroupByKey(Graph.Vertex groupByKey);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java
deleted file mode 100644
index b4a2e7c..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.beam.runners.mapreduce.translation;
-
-import org.apache.beam.runners.mapreduce.MapReduceRunner;
-import org.apache.beam.sdk.Pipeline;
-
-/**
- * Pipeline translator for {@link MapReduceRunner}.
- */
-public class MapReducePipelineTranslator extends 
Pipeline.PipelineVisitor.Defaults {
-
-}

Reply via email to