Port join library to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/620bd994 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/620bd994 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/620bd994 Branch: refs/heads/master Commit: 620bd9949a6176ddd1903687fe9b8ba8c5822367 Parents: a1c06d7 Author: Kenneth Knowles <[email protected]> Authored: Wed Aug 3 19:55:21 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 4 14:56:42 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/extensions/joinlibrary/Join.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/620bd994/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 88836f9..f4e6ccb 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.joinlibrary; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -59,8 +59,8 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @ProcessElement public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); @@ -108,8 +108,8 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @ProcessElement public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); @@ -161,8 +161,8 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @ProcessElement public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element();
