Port Flink fork of examples to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c Branch: refs/heads/master Commit: 87313f1c3d8cf874e04aaf528161478afa030f38 Parents: ae1f6d1 Author: Kenneth Knowles <k...@google.com> Authored: Fri Aug 5 12:24:24 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/flink/examples/TFIDF.java | 28 +++++++-------- .../beam/runners/flink/examples/WordCount.java | 5 +-- .../flink/examples/streaming/AutoComplete.java | 37 ++++++++++---------- .../flink/examples/streaming/JoinExamples.java | 14 ++++---- .../examples/streaming/KafkaIOExamples.java | 7 ++-- .../KafkaWindowedWordCountExample.java | 10 +++--- .../examples/streaming/WindowedWordCount.java | 10 +++--- 7 files changed, 57 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 716c8ad..4deca12 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.PTransform; @@ -230,10 +230,10 @@ public class TFIDF { // Create a collection of pairs mapping a URI to each // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent - .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() { + .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey(); String line = c.element().getValue(); @@ -275,10 +275,10 @@ public class TFIDF { // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount .apply("ShiftKeys", ParDo.of( - new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey().getKey(); String word = c.element().getKey().getValue(); @@ -316,10 +316,10 @@ public class TFIDF { // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal .apply("ComputeTermFrequencies", ParDo.of( - new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { URI uri = c.element().getKey(); Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); @@ -339,14 +339,14 @@ public class TFIDF { // documents in which the word appears divided by the total // number of documents in the corpus. Note how the total number of // documents is passed as a side input; the same value is - // presented to each invocation of the OldDoFn. + // presented to each invocation of the DoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) - .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() { + .of(new DoFn<KV<String, Long>, KV<String, Double>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { String word = c.element().getKey(); Long documentCount = c.element().getValue(); @@ -375,10 +375,10 @@ public class TFIDF { return wordToUriAndTfAndDf .apply("ComputeTfIdf", ParDo.of( - new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { String word = c.element().getKey(); Double df = c.element().getValue().getOnly(dfTag); @@ -416,10 +416,10 @@ public class TFIDF { @Override public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { return wordToUriAndTfIdf - .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { c.output(String.format("%s,\t%s,\t%f", c.element().getKey(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index 080cdc9..fdffd39 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -38,11 +39,11 @@ import org.apache.beam.sdk.values.PCollection; public class WordCount { - public static class ExtractWordsFn extends OldDoFn<String, String> { + public static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index 068404a..aff1a35 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -92,10 +93,10 @@ public class AutoComplete { // Map the KV outputs of Count into our own CompletionCandiate class. .apply("CreateCompletionCandidates", ParDo.of( - new OldDoFn<KV<String, Long>, CompletionCandidate>() { + new DoFn<KV<String, Long>, CompletionCandidate>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); c.output(cand); @@ -182,10 +183,10 @@ public class AutoComplete { } private static class FlattenTops - extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { for (CompletionCandidate cc : c.element().getValue()) { c.output(cc); @@ -236,10 +237,10 @@ public class AutoComplete { } /** - * A OldDoFn that keys each candidate by all its prefixes. + * A DoFn that keys each candidate by all its prefixes. */ private static class AllPrefixes - extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { private static final long serialVersionUID = 0; private final int minPrefix; @@ -251,7 +252,7 @@ public class AutoComplete { this.minPrefix = minPrefix; this.maxPrefix = maxPrefix; } - @Override + @ProcessElement public void processElement(ProcessContext c) { String word = c.element().value; for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { @@ -314,11 +315,11 @@ public class AutoComplete { } } - static class ExtractWordsFn extends OldDoFn<String, String> { + static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); @@ -337,21 +338,21 @@ public class AutoComplete { } /** - * Takes as input a the top candidates per prefix, and emits an entity - * suitable for writing to Datastore. + * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to + * Datastore. */ - static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String> - implements OldDoFn.RequiresWindowAccess{ + static class FormatForPerTaskLocalFile + extends DoFn<KV<String, List<CompletionCandidate>>, String> { private static final long serialVersionUID = 0; - @Override - public void processElement(ProcessContext c) { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { StringBuilder str = new StringBuilder(); KV<String, List<CompletionCandidate>> elem = c.element(); - str.append(elem.getKey() +" @ "+ c.window() +" -> "); - for(CompletionCandidate cand: elem.getValue()) { + str.append(elem.getKey() +" @ "+ window +" -> "); + for (CompletionCandidate cand: elem.getValue()) { str.append(cand.toString() + " "); } System.out.println(str.toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index 7d7c0c7..458a263 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -76,10 +76,10 @@ public class JoinExamples { // country code 'key' -> string of <event info>, <country name> PCollection<KV<String, String>> finalResultCollection = kvpCollection.apply("Process", ParDo.of( - new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() { + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { KV<String, CoGbkResult> e = c.element(); String key = e.getKey(); @@ -98,10 +98,10 @@ public class JoinExamples { })); return finalResultCollection - .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { String result = c.element().getKey() + " -> " + c.element().getValue(); System.out.println(result); @@ -110,10 +110,10 @@ public class JoinExamples { })); } - static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> { + static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) { String line = c.element().toLowerCase(); String key = line.split("\\s")[0]; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java index 395b409..68a9edc 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java @@ -30,9 +30,10 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; @@ -326,9 +327,9 @@ public class KafkaIOExamples { * Print contents to stdout * @param <T> type of the input */ - private static class PrintFn<T> extends OldDoFn<T, T> { + private static class PrintFn<T> extends DoFn<T, T> { - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { System.out.println(c.element().toString()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 8c31783..39ce225 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -49,11 +49,11 @@ public class KafkaWindowedWordCountExample { static final String GROUP_ID = "myGroup"; // Default groupId static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - public static class ExtractWordsFn extends OldDoFn<String, String> { + public static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); @@ -71,8 +71,8 @@ public class KafkaWindowedWordCountExample { } } - public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { - @Override + public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @ProcessElement public void processElement(ProcessContext c) { String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); System.out.println(row); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java index d149e4e..fe8e627 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -59,19 +59,19 @@ public class WindowedWordCount { static final long WINDOW_SIZE = 10; // Default window duration in seconds static final long SLIDE_SIZE = 5; // Default window slide in seconds - static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { - @Override + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @ProcessElement public void processElement(ProcessContext c) { String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); c.output(row); } } - static class ExtractWordsFn extends OldDoFn<String, String> { + static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L);