MapReduceRunner: add Graph and its visitors.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/092380cf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/092380cf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/092380cf Branch: refs/heads/mr-runner Commit: 092380cf87ada9d4a2b5faa7a42dff7005c44f17 Parents: 9fffd55 Author: Pei He <[email protected]> Authored: Tue Jul 11 10:45:11 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:46 2017 +0800 ---------------------------------------------------------------------- .../runners/mapreduce/translation/Graph.java | 190 +++++++++++++++++++ .../mapreduce/translation/GraphConverter.java | 40 ++++ .../mapreduce/translation/GraphPlanner.java | 99 ++++++++++ .../mapreduce/translation/GraphVisitor.java | 11 ++ .../MapReducePipelineTranslator.java | 11 -- 5 files changed, 340 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java new file mode 100644 index 0000000..a9831bd --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -0,0 +1,190 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; + +/** + * Created by peihe on 06/07/2017. + */ +public class Graph { + + private final Map<PTransform, Vertex> vertices; + private final Map<HeadTail, Edge> edges; + private final Set<Vertex> leafVertices; + + public Graph() { + this.vertices = Maps.newHashMap(); + this.edges = Maps.newHashMap(); + this.leafVertices = Sets.newHashSet(); + } + + public Vertex addVertex(PTransform<?, ?> transform) { + checkState(!vertices.containsKey(transform)); + Vertex v = new Vertex(transform); + vertices.put(transform, v); + leafVertices.add(v); + return v; + } + + public Edge addEdge(Vertex head, Vertex tail) { + HeadTail headTail = HeadTail.of(head, tail); + checkState(!edges.containsKey(headTail)); + Edge e = new Edge(headTail); + edges.put(headTail, e); + head.addOutgoing(e); + tail.addIncoming(e); + leafVertices.remove(head); + return e; + } + + public Vertex getVertex(PTransform<?, ?> transform) { + return vertices.get(transform); + } + + public Edge getEdge(Vertex head, Vertex tail) { + return edges.get(HeadTail.of(head, tail)); + } + + public Set<Vertex> getLeafVertices() { + return leafVertices; + } + + public void accept(GraphVisitor visitor) { + for (Vertex v : leafVertices) { + v.accept(visitor); + } + } + + //TODO: add equals, hashCode, toString for following classses. + + public static class Vertex { + private final PTransform<?, ?> transform; + private final Set<Edge> incoming; + private final Set<Edge> outgoing; + + public Vertex(PTransform transform) { + this.transform = checkNotNull(transform, "transform"); + this.incoming = Sets.newHashSet(); + this.outgoing = Sets.newHashSet(); + } + + public PTransform<?, ?> getTransform() { + return transform; + } + + public Set<Edge> getIncoming() { + return incoming; + } + + public Set<Edge> getOutgoing() { + return outgoing; + } + + public boolean isSource() { + return transform instanceof Read.Bounded || transform instanceof Read.Unbounded; + } + + public boolean isGroupByKey() { + return transform instanceof GroupByKey; + } + + public void addIncoming(Edge edge) { + incoming.add(edge); + } + + public void addOutgoing(Edge edge) { + outgoing.add(edge); + } + + public void accept(GraphVisitor visitor) { + if (transform instanceof ParDo.SingleOutput || transform instanceof ParDo.MultiOutput) { + visitor.visitParDo(this); + } else if (transform instanceof GroupByKey) { + visitor.visitGroupByKey(this); + } else if (transform instanceof Read.Bounded) { + visitor.visitRead(this); + } else if (transform instanceof Flatten.PCollections + || transform instanceof Flatten.Iterables) { + visitor.visitFlatten(this); + } else { + throw new RuntimeException("Unexpected transform type: " + transform.getClass()); + } + } + } + + public static class Edge { + private final HeadTail headTail; + private final Set<NodePath> paths; + + public static Edge of(Vertex head, Vertex tail) { + return of(HeadTail.of(head, tail)); + } + + public static Edge of(HeadTail headTail) { + return new Edge(headTail); + } + + private Edge(HeadTail headTail) { + this.headTail = checkNotNull(headTail, "headTail"); + this.paths = Sets.newHashSet(); + } + + public Vertex getHead() { + return headTail.getHead(); + } + + public Vertex getTail() { + return headTail.getTail(); + } + + public Set<NodePath> getPaths() { + return paths; + } + + public void addPath(NodePath path) { + paths.add(checkNotNull(path, "path")); + } + } + + public static class NodePath { + private final LinkedList<PTransform<?, ?>> path; + + public NodePath() { + this.path = new LinkedList<>(); + } + + public NodePath(NodePath nodePath) { + this.path = new LinkedList<>(nodePath.path); + } + + public void addFirst(PTransform<?, ?> transform) { + path.addFirst(transform); + } + + public void addLast(PTransform<?, ?> transform) { + path.addLast(transform); + } + } + + @AutoValue + public abstract static class HeadTail { + abstract Vertex getHead(); + abstract Vertex getTail(); + + public static HeadTail of(Vertex head, Vertex tail) { + return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_HeadTail(head, tail); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java new file mode 100644 index 0000000..306e58e --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -0,0 +1,40 @@ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.mapreduce.MapReduceRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PValue; + +/** + * Pipeline translator for {@link MapReduceRunner}. + */ +public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { + + private final Map<PValue, Graph.Vertex> outputToProducer; + private final Graph graph; + + public GraphConverter() { + this.outputToProducer = Maps.newHashMap(); + this.graph = new Graph(); + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Graph.Vertex v = new Graph.Vertex(node.getTransform()); + + for (PValue input : node.getInputs().values()) { + if (outputToProducer.containsKey(input)) { + Graph.Vertex producer = outputToProducer.get(input); + graph.addEdge(producer, v); + } + } + + for (PValue output : node.getOutputs().values()) { + outputToProducer.put(output, v); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java new file mode 100644 index 0000000..d4fa2d9 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -0,0 +1,99 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Created by peihe on 06/07/2017. + */ +public class GraphPlanner { + + public Graph plan(Graph initGraph) { + FusionVisitor fusionVisitor = new FusionVisitor(); + initGraph.accept(fusionVisitor); + return fusionVisitor.getFusedGraph(); + } + + private class FusionVisitor implements GraphVisitor { + + private Graph fusedGraph; + private Graph.Vertex workingVertex; + private Graph.NodePath workingPath; + + FusionVisitor() { + fusedGraph = new Graph(); + workingVertex = null; + workingPath = null; + } + + @Override + public void visitRead(Graph.Vertex read) { + if (workingVertex == null) { + // drop if read is leaf vertex. + return; + } + Graph.Vertex v = fusedGraph.addVertex(read.getTransform()); + workingPath.addFirst(read.getTransform()); + Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); + edge.addPath(workingPath); + } + + @Override + public void visitParDo(Graph.Vertex parDo) { + checkArgument( + parDo.getTransform().getAdditionalInputs().isEmpty(), + "Side inputs are not supported."); + if (workingVertex == null) { + // Leaf vertex + workingVertex = fusedGraph.addVertex(parDo.getTransform()); + workingPath = new Graph.NodePath(); + } else { + workingPath.addFirst(parDo.getTransform()); + } + checkArgument( + parDo.getIncoming().size() == 1, + "Side inputs are not supported."); + processParent(parDo.getIncoming().iterator().next().getHead()); + } + + @Override + public void visitFlatten(Graph.Vertex flatten) { + if (workingVertex == null) { + return; + } + Graph.NodePath basePath = workingPath; + Graph.Vertex baseVertex = workingVertex; + for (Graph.Edge e : flatten.getIncoming()) { + workingPath = new Graph.NodePath(basePath); + workingVertex = baseVertex; + processParent(e.getHead()); + } + } + + @Override + public void visitGroupByKey(Graph.Vertex groupByKey) { + if (workingVertex == null) { + return; + } + Graph.Vertex v = fusedGraph.addVertex(groupByKey.getTransform()); + workingPath.addFirst(groupByKey.getTransform()); + Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); + edge.addPath(workingPath); + } + + public Graph getFusedGraph() { + return fusedGraph; + } + + private void processParent(Graph.Vertex parent) { + Graph.Vertex v = fusedGraph.getVertex(parent.getTransform()); + if (v == null) { + parent.accept(this); + } else { + // TODO: parent is consumed more than once. + // It is duplicated in multiple outgoing path. Figure out the impact. + workingPath.addFirst(parent.getTransform()); + fusedGraph.getEdge(v, workingVertex).addPath(workingPath); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java new file mode 100644 index 0000000..fe4a76f --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java @@ -0,0 +1,11 @@ +package org.apache.beam.runners.mapreduce.translation; + +/** + * Created by peihe on 06/07/2017. + */ +public interface GraphVisitor { + void visitRead(Graph.Vertex read); + void visitParDo(Graph.Vertex parDo); + void visitFlatten(Graph.Vertex flatten); + void visitGroupByKey(Graph.Vertex groupByKey); +} http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java deleted file mode 100644 index b4a2e7c..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReducePipelineTranslator.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.beam.runners.mapreduce.translation; - -import org.apache.beam.runners.mapreduce.MapReduceRunner; -import org.apache.beam.sdk.Pipeline; - -/** - * Pipeline translator for {@link MapReduceRunner}. - */ -public class MapReducePipelineTranslator extends Pipeline.PipelineVisitor.Defaults { - -}
