Port most of Combine to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234 Branch: refs/heads/master Commit: 331f523461094af666a20bd97e1e15f1dec3feba Parents: b1db02d Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 12:11:11 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Combine.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 6fc2324..a825800 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1473,9 +1473,9 @@ public class Combine { PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline() .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of( - new OldDoFn<Void, OutputT>() { - @Override - public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) { + new DoFn<Void, OutputT>() { + @ProcessElement + public void processElement(ProcessContext c) { Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator(); if (!combined.hasNext()) { c.output(defaultValue); @@ -2097,15 +2097,15 @@ public class Combine { final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>(); final TupleTag<KV<K, InputT>> cold = new TupleTag<>(); PCollectionTuple split = input.apply("AddNonce", ParDo.of( - new OldDoFn<KV<K, InputT>, KV<K, InputT>>() { + new DoFn<KV<K, InputT>, KV<K, InputT>>() { transient int counter; - @Override + @StartBundle public void startBundle(Context c) { counter = ThreadLocalRandom.current().nextInt( Integer.MAX_VALUE); } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV<K, InputT> kv = c.element(); int spread = Math.max(1, hotKeyFanout.apply(kv.getKey())); @@ -2135,9 +2135,9 @@ public class Combine { .setWindowingStrategyInternal(preCombineStrategy) .apply("PreCombineHot", Combine.perKey(hotPreCombine)) .apply("StripNonce", ParDo.of( - new OldDoFn<KV<KV<K, Integer>, AccumT>, + new DoFn<KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() { - @Override + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of( c.element().getKey().getKey(), @@ -2151,8 +2151,8 @@ public class Combine { .get(cold) .setCoder(inputCoder) .apply("PrepareCold", ParDo.of( - new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { - @Override + new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element().getKey(), InputOrAccum.<InputT, AccumT>input(c.element().getValue())));
