Port Reshuffle to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c Branch: refs/heads/master Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999 Parents: d798413 Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 11:47:23 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 66c7cc0..ad33a25 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.util; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -70,8 +70,8 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) .apply("ExpandIterable", ParDo.of( - new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() { - @Override + new DoFn<KV<K, Iterable<V>>, KV<K, V>>() { + @ProcessElement public void processElement(ProcessContext c) { K key = c.element().getKey(); for (V value : c.element().getValue()) {
