MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8b366de Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8b366de Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8b366de Branch: refs/heads/mr-runner Commit: a8b366de9e4e0c79a7800184afc79b377477b8ed Parents: 092380c Author: Pei He <[email protected]> Authored: Thu Jul 13 14:09:10 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:46 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/mapreduce/MapReduceRunner.java | 12 +++ .../runners/mapreduce/translation/Graph.java | 89 ++++++++++++++- .../mapreduce/translation/GraphConverter.java | 6 +- .../mapreduce/translation/GraphPlanner.java | 1 + .../beam/runners/mapreduce/WordCountTest.java | 108 +++++++++++++++++++ .../translation/GraphConverterTest.java | 39 +++++++ .../mapreduce/translation/GraphPlannerTest.java | 42 ++++++++ 7 files changed, 294 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index bb9555e..247a8e5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -3,11 +3,23 @@ package org.apache.beam.runners.mapreduce; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; /** * {@link PipelineRunner} for crunch. */ public class MapReduceRunner extends PipelineRunner<PipelineResult> { + + /** + * Construct a runner from the provided options. + * + * @param options Properties which configure the runner. + * @return The newly created runner. + */ + public static MapReduceRunner fromOptions(PipelineOptions options) { + return new MapReduceRunner(); + } + @Override public PipelineResult run(Pipeline pipeline) { return null; http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index a9831bd..1ca5a05 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -4,16 +4,21 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.LinkedList; import java.util.Map; +import java.util.Objects; import java.util.Set; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; /** * Created by peihe on 06/07/2017. @@ -57,8 +62,16 @@ public class Graph { return edges.get(HeadTail.of(head, tail)); } - public Set<Vertex> getLeafVertices() { - return leafVertices; + public Iterable<Vertex> getAllVertices() { + return vertices.values(); + } + + public Iterable<Edge> getAllEdges() { + return edges.values(); + } + + public Iterable<Vertex> getLeafVertices() { + return ImmutableList.copyOf(leafVertices); } public void accept(GraphVisitor visitor) { @@ -122,6 +135,29 @@ public class Graph { throw new RuntimeException("Unexpected transform type: " + transform.getClass()); } } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof Vertex) { + Vertex other = (Vertex) obj; + return transform.equals(other.transform); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(this.getClass(), transform); + } + + @Override + public String toString() { + return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .setExcludeFieldNames(new String[] { "outgoing", "incoming" }).toString(); + } } public static class Edge { @@ -156,6 +192,28 @@ public class Graph { public void addPath(NodePath path) { paths.add(checkNotNull(path, "path")); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof Edge) { + Edge other = (Edge) obj; + return headTail.equals(other.headTail) && paths.equals(paths); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(headTail, paths); + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } } public static class NodePath { @@ -176,6 +234,33 @@ public class Graph { public void addLast(PTransform<?, ?> transform) { path.addLast(transform); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof NodePath) { + NodePath other = (NodePath) obj; + return path.equals(other.path); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(this.getClass(), path.hashCode()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (PTransform<?, ?> collect : path) { + sb.append(collect.getName() + "|"); + } + // sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } } @AutoValue http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index 306e58e..359a6e2 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -24,7 +24,7 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Graph.Vertex v = new Graph.Vertex(node.getTransform()); + Graph.Vertex v = graph.addVertex(node.getTransform()); for (PValue input : node.getInputs().values()) { if (outputToProducer.containsKey(input)) { @@ -37,4 +37,8 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { outputToProducer.put(output, v); } } + + public Graph getGraph() { + return graph; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index d4fa2d9..793efd7 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -78,6 +78,7 @@ public class GraphPlanner { workingPath.addFirst(groupByKey.getTransform()); Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); edge.addPath(workingPath); + processParent(groupByKey.getIncoming().iterator().next().getHead()); } public Graph getFusedGraph() { http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java new file mode 100644 index 0000000..51c26f2 --- /dev/null +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -0,0 +1,108 @@ +package org.apache.beam.runners.mapreduce; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test that runs WordCount. + */ +@RunWith(JUnit4.class) +public class WordCountTest { + + public static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; + + /** + * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns + * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it + * to a ParDo in the pipeline. + */ + static class ExtractWordsFn extends DoFn<String, String> { + private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); + + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.inc(); + } + + // Split the line into words. + String[] words = c.element().split(TOKENIZER_PATTERN); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { + @Override + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); + } + } + + /** + * A PTransform that converts a PCollection containing lines of text into a PCollection of + * formatted word counts. + * + * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and + * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, + * modular testing, and an improved monitoring experience. + */ + public static class CountWords extends PTransform<PCollection<String>, + PCollection<KV<String, Long>>> { + @Override + public PCollection<KV<String, Long>> expand(PCollection<String> lines) { + + // Convert lines of text into individual words. + PCollection<String> words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + return wordCounts; + } + } + + @Test + public void testWordCount() { + String input = "gs://apache-beam-samples/shakespeare/kinglear.txt"; + String output = "./output"; + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(MapReduceRunner.class); + Pipeline p = Pipeline.create(options); + + // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the + // static FormatAsTextFn() to the ParDo transform. + p.apply("ReadLines", TextIO.read().from(input)) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply("WriteCounts", TextIO.write().to(output)); + + p.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java new file mode 100644 index 0000000..4f0c283 --- /dev/null +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java @@ -0,0 +1,39 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GraphConverter}. + */ +@RunWith(JUnit4.class) +public class GraphConverterTest { + + @Test + public void testCombine() throws Exception { + Pipeline p = Pipeline.create(); + PCollection<KV<String, Integer>> input = p + .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Sum.<String>integersPerKey()); + GraphConverter graphConverter = new GraphConverter(); + p.traverseTopologically(graphConverter); + + Graph graph = graphConverter.getGraph(); + + assertEquals(3, Iterables.size(graph.getAllVertices())); + assertEquals(2, Iterables.size(graph.getAllEdges())); + assertEquals(1, Iterables.size(graph.getLeafVertices())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java new file mode 100644 index 0000000..c98f817 --- /dev/null +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java @@ -0,0 +1,42 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GraphPlanner}. + */ +@RunWith(JUnit4.class) +public class GraphPlannerTest { + + @Test + public void testCombine() throws Exception { + Pipeline p = Pipeline.create(); + PCollection<KV<String, Integer>> input = p + .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Sum.<String>integersPerKey()); + GraphConverter graphConverter = new GraphConverter(); + p.traverseTopologically(graphConverter); + + Graph graph = graphConverter.getGraph(); + + GraphPlanner planner = new GraphPlanner(); + Graph fusedGraph = planner.plan(graph); + + assertEquals(3, Iterables.size(fusedGraph.getAllVertices())); + assertEquals(2, Iterables.size(fusedGraph.getAllEdges())); + assertEquals(1, Iterables.size(fusedGraph.getLeafVertices())); + } +}
