Port AutoComplete example from OldDoFn to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3236eec2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3236eec2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3236eec2 Branch: refs/heads/master Commit: 3236eec22a8902393e6becefb771b9a4768ccc50 Parents: 49d2f17 Author: Kenneth Knowles <k...@google.com> Authored: Fri Jul 22 14:29:37 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Aug 3 18:25:53 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/AutoComplete.java | 30 ++++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3236eec2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 7b44af8..1ab39c9 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -36,9 +36,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Partition; @@ -130,8 +130,8 @@ public class AutoComplete { // Map the KV outputs of Count into our own CompletionCandiate class. .apply("CreateCompletionCandidates", ParDo.of( - new OldDoFn<KV<String, Long>, CompletionCandidate>() { - @Override + new DoFn<KV<String, Long>, CompletionCandidate>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue())); } @@ -209,8 +209,8 @@ public class AutoComplete { } private static class FlattenTops - extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { - @Override + extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + @ProcessElement public void processElement(ProcessContext c) { for (CompletionCandidate cc : c.element().getValue()) { c.output(cc); @@ -260,10 +260,10 @@ public class AutoComplete { } /** - * A OldDoFn that keys each candidate by all its prefixes. + * A DoFn that keys each candidate by all its prefixes. */ private static class AllPrefixes - extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { private final int minPrefix; private final int maxPrefix; public AllPrefixes(int minPrefix) { @@ -273,8 +273,8 @@ public class AutoComplete { this.minPrefix = minPrefix; this.maxPrefix = maxPrefix; } - @Override - public void processElement(ProcessContext c) { + @ProcessElement + public void processElement(ProcessContext c) { String word = c.element().value; for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { c.output(KV.of(word.substring(0, i), c.element())); @@ -341,8 +341,8 @@ public class AutoComplete { /** * Takes as input a set of strings, and emits each #hashtag found therein. */ - static class ExtractHashtags extends OldDoFn<String, String> { - @Override + static class ExtractHashtags extends DoFn<String, String> { + @ProcessElement public void processElement(ProcessContext c) { Matcher m = Pattern.compile("#\\S+").matcher(c.element()); while (m.find()) { @@ -351,8 +351,8 @@ public class AutoComplete { } } - static class FormatForBigquery extends OldDoFn<KV<String, List<CompletionCandidate>>, TableRow> { - @Override + static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> { + @ProcessElement public void processElement(ProcessContext c) { List<TableRow> completions = new ArrayList<>(); for (CompletionCandidate cc : c.element().getValue()) { @@ -385,14 +385,14 @@ public class AutoComplete { * Takes as input a the top candidates per prefix, and emits an entity * suitable for writing to Datastore. */ - static class FormatForDatastore extends OldDoFn<KV<String, List<CompletionCandidate>>, Entity> { + static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> { private String kind; public FormatForDatastore(String kind) { this.kind = kind; } - @Override + @ProcessElement public void processElement(ProcessContext c) { Entity.Builder entityBuilder = Entity.newBuilder(); Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();