Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 02b2248a5 -> 091a15a07
Port various Spark runner tests to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5df3583 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5df3583 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5df3583 Branch: refs/heads/gearpump-runner Commit: f5df358320cfde6a1c4d012d4169af691f6a18e9 Parents: d6395e9 Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 12:31:07 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/spark/TfIdfTest.java | 22 ++++++++++---------- .../spark/translation/CombinePerKeyTest.java | 6 +++--- .../translation/MultiOutputWordCountTest.java | 10 ++++----- .../spark/translation/SerializationTest.java | 10 ++++----- .../streaming/KafkaStreamingTest.java | 6 +++--- 5 files changed, 27 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 074e6aa..17bf6dd 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -101,8 +101,8 @@ public class TfIdfTest { // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent .apply("SplitWords", ParDo.of( - new OldDoFn<KV<URI, String>, KV<URI, String>>() { - @Override + new DoFn<KV<URI, String>, KV<URI, String>>() { + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey(); String line = c.element().getValue(); @@ -144,8 +144,8 @@ public class TfIdfTest { // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount .apply("ShiftKeys", ParDo.of( - new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { - @Override + new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey().getKey(); String word = c.element().getKey().getValue(); @@ -183,8 +183,8 @@ public class TfIdfTest { // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal .apply("ComputeTermFrequencies", ParDo.of( - new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { - @Override + new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey(); Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); @@ -208,8 +208,8 @@ public class TfIdfTest { PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) - .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() { - @Override + .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + @ProcessElement public void processElement(ProcessContext c) { String word = c.element().getKey(); Long documentCount = c.element().getValue(); @@ -237,8 +237,8 @@ public class TfIdfTest { // divided by the log of the document frequency. return wordToUriAndTfAndDf .apply("ComputeTfIdf", ParDo.of( - new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - @Override + new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + @ProcessElement public void processElement(ProcessContext c) { String word = c.element().getKey(); Double df = c.element().getValue().getOnly(dfTag); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index dee9213..cdf2cfb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -70,8 +70,8 @@ public class CombinePerKeyTest { private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> { @Override public PCollection<KV<T, Long>> apply(PCollection<T> pcol) { - PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() { - @Override + PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() { + @ProcessElement public void processElement(ProcessContext processContext) throws Exception { processContext.output(KV.of(processContext.element(), 1L)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 066521b..291f7b2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.ApproximateUnique; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -105,7 +105,7 @@ public class MultiOutputWordCountTest { /** * A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsFn extends OldDoFn<String, String> { + static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", new Sum.SumIntegerFn()); @@ -117,7 +117,7 @@ public class MultiOutputWordCountTest { this.regex = regex; } - @Override + @ProcessElement public void processElement(ProcessContext c) { String[] words = c.element().split(c.sideInput(regex)); for (String word : words) { @@ -170,8 +170,8 @@ public class MultiOutputWordCountTest { } } - private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> { - @Override + private static class FormatCountsFn extends DoFn<KV<String, Long>, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getKey() + ": " + c.element().getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index fb97b8b..019b107 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -147,12 +147,12 @@ public class SerializationTest { /** * A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> { + static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> { private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { // Split the line into words. String[] words = WORD_BOUNDARY.split(c.element().toString()); @@ -175,8 +175,8 @@ public class SerializationTest { /** * A OldDoFn that converts a Word and Count into a printable string. */ - private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> { - @Override + private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> { + @ProcessElement public void processElement(ProcessContext c) { c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index fa98ca3..17044aa 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -121,8 +121,8 @@ public class KafkaStreamingTest { EMBEDDED_ZOOKEEPER.shutdown(); } - private static class FormatKVFn extends OldDoFn<KV<String, String>, String> { - @Override + private static class FormatKVFn extends DoFn<KV<String, String>, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getKey() + "," + c.element().getValue()); }
