Port easy Java SDK tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1959ddbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1959ddbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1959ddbe Branch: refs/heads/master Commit: 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5 Parents: f5011e5 Author: Kenneth Knowles <[email protected]> Authored: Wed Aug 3 20:15:12 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 4 14:56:42 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/PipelineTest.java | 8 ++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 6 ++-- .../beam/sdk/coders/CoderRegistryTest.java | 10 +++--- .../beam/sdk/coders/SerializableCoderTest.java | 10 +++--- .../apache/beam/sdk/io/CountingInputTest.java | 6 ++-- .../apache/beam/sdk/io/CountingSourceTest.java | 6 ++-- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 6 ++-- .../sdk/transforms/ApproximateUniqueTest.java | 6 ++-- .../beam/sdk/transforms/CombineFnsTest.java | 4 +-- .../apache/beam/sdk/transforms/CombineTest.java | 18 +++++------ .../apache/beam/sdk/transforms/CreateTest.java | 4 +-- .../apache/beam/sdk/transforms/FlattenTest.java | 8 ++--- .../beam/sdk/transforms/GroupByKeyTest.java | 8 ++--- .../beam/sdk/transforms/WithTimestampsTest.java | 12 +++---- .../display/DisplayDataEvaluatorTest.java | 10 +++--- .../sdk/transforms/display/DisplayDataTest.java | 6 ++-- .../sdk/transforms/join/CoGroupByKeyTest.java | 34 ++++++++++---------- .../sdk/transforms/windowing/WindowTest.java | 10 +++--- .../sdk/transforms/windowing/WindowingTest.java | 23 ++++++------- .../beam/sdk/values/PCollectionTupleTest.java | 6 ++-- .../apache/beam/sdk/values/TypedPValueTest.java | 10 +++--- 21 files changed, 106 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 5137031..8b86499 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.UserCodeException; @@ -146,9 +146,9 @@ public class PipelineTest { private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix( final String suffix) { - return ParDo.of(new OldDoFn<String, String>() { - @Override - public void processElement(OldDoFn<String, String>.ProcessContext c) { + return ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) { c.output(c.element() + suffix); } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 41d0932..3b13e35 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.SerializableUtils; @@ -134,8 +134,8 @@ public class AvroCoderTest { } } - private static class GetTextFn extends OldDoFn<Pojo, String> { - @Override + private static class GetTextFn extends DoFn<Pojo, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().text); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 35ec6c6..da15405 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; @@ -366,8 +366,8 @@ public class CoderRegistryTest { private static class PTransformOutputingMySerializableGeneric extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> { - private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> { - @Override + private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> { + @ProcessElement public void processElement(ProcessContext c) { } } @@ -430,8 +430,8 @@ public class CoderRegistryTest { PCollection<String>, PCollection<KV<String, MySerializableGeneric<T>>>> { - private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> { - @Override + private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> { + @ProcessElement public void processElement(ProcessContext c) { } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index 3e7fd50..b5465fa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.CoderUtils; @@ -82,15 +82,15 @@ public class SerializableCoderTest implements Serializable { } } - static class StringToRecord extends OldDoFn<String, MyRecord> { - @Override + static class StringToRecord extends DoFn<String, MyRecord> { + @ProcessElement public void processElement(ProcessContext c) { c.output(new MyRecord(c.element())); } } - static class RecordToString extends OldDoFn<MyRecord, String> { - @Override + static class RecordToString extends DoFn<MyRecord, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index 95f7454..4ec2c9a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -120,8 +120,8 @@ public class CountingInputTest { assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); } - private static class ElementValueDiff extends OldDoFn<Long, Long> { - @Override + private static class ElementValueDiff extends DoFn<Long, Long> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 45f636f..0bd91c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -159,8 +159,8 @@ public class CountingSourceTest { p.run(); } - private static class ElementValueDiff extends OldDoFn<Long, Long> { - @Override + private static class ElementValueDiff extends DoFn<Long, Long> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index f8592c9..db03a5c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; @@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest { private static final String ID_LABEL = "id"; private static final int NUM_SHARDS = 10; - private static class Stamp extends OldDoFn<String, String> { - @Override + private static class Stamp extends DoFn<String, String> { + @ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index 5c8732f..7b6d671 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -54,7 +54,7 @@ import java.util.List; */ @RunWith(JUnit4.class) public class ApproximateUniqueTest implements Serializable { - // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses + // implements Serializable just to make it easy to use anonymous inner DoFn subclasses @Test public void testEstimationErrorToSampleSize() { @@ -223,8 +223,8 @@ public class ApproximateUniqueTest implements Serializable { .apply(View.<Long>asSingleton()); PCollection<KV<Long, Long>> approximateAndExact = approximate - .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() { - @Override + .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.sideInput(exact))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index d6bf826..95ba1aa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -461,7 +461,7 @@ public class CombineFnsTest { } private static class ExtractResultDoFn - extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> { + extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> { private final TupleTag<Integer> maxIntTag; private final TupleTag<UserString> concatStringTag; @@ -471,7 +471,7 @@ public class CombineFnsTest { this.concatStringTag = concatStringTag; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { UserString userString = c.element().getValue().get(concatStringTag); KV<Integer, String> value = KV.of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index cb9928e..6421b3b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -117,7 +117,7 @@ public class CombineTest implements Serializable { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55 }; - @Mock private OldDoFn<?, ?>.ProcessContext processContext; + @Mock private DoFn<?, ?>.ProcessContext processContext; PCollection<KV<String, Integer>> createInput(Pipeline p, KV<String, Integer>[] table) { @@ -372,8 +372,8 @@ public class CombineTest implements Serializable { pipeline.run(); } - private static class FormatPaneInfo extends OldDoFn<Integer, String> { - @Override + private static class FormatPaneInfo extends DoFn<Integer, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + ": " + c.pane().isLast()); } @@ -560,8 +560,8 @@ public class CombineTest implements Serializable { pipeline.run(); } - private static class GetLast extends OldDoFn<Integer, Integer> { - @Override + private static class GetLast extends DoFn<Integer, Integer> { + @ProcessElement public void processElement(ProcessContext c) { if (c.pane().isLast()) { c.output(c.element()); @@ -653,8 +653,8 @@ public class CombineTest implements Serializable { PCollection<Integer> output = pipeline .apply("CreateVoidMainInput", Create.of((Void) null)) - .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() { - @Override + .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -1176,8 +1176,8 @@ public class CombineTest implements Serializable { } private static <T> PCollection<T> copy(PCollection<T> pc, final int n) { - return pc.apply(ParDo.of(new OldDoFn<T, T>() { - @Override + return pc.apply(ParDo.of(new DoFn<T, T>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { for (int i = 0; i < n; i++) { c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index cf65423..9db0136 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -229,8 +229,8 @@ public class CreateTest { p.run(); } - private static class PrintTimestamps extends OldDoFn<String, String> { - @Override + private static class PrintTimestamps extends DoFn<String, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + ":" + c.timestamp().getMillis()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index b81eedb..604536b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -130,8 +130,8 @@ public class FlattenTest implements Serializable { PCollection<String> output = p .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() { - @Override + .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() { + @ProcessElement public void processElement(ProcessContext c) { for (String side : c.sideInput(view)) { c.output(side); @@ -339,8 +339,8 @@ public class FlattenTest implements Serializable { ///////////////////////////////////////////////////////////////////////////// - private static class IdentityFn<T> extends OldDoFn<T, T> { - @Override + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 15c3ba8..afe460f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -371,14 +371,14 @@ public class GroupByKeyTest { pipeline.run(); } - private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> { + private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> { private final Instant timestamp; public AssertTimestamp(Instant timestamp) { this.timestamp = timestamp; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat(c.timestamp(), equalTo(timestamp)); } @@ -506,9 +506,9 @@ public class GroupByKeyTest { * Creates a KV that wraps the original KV together with a random key. */ static class AssignRandomKey - extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> { + extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> { - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java index d2ba452..e381470 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java @@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable { .apply(WithTimestamps.of(timestampFn)); PCollection<KV<String, Instant>> timestampedVals = - timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() { - @Override - public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c) + timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + @ProcessElement + public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } @@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable { WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L))); PCollection<KV<String, Instant>> timestampedVals = - timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() { - @Override - public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c) + timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + @ProcessElement + public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java index c1848c6..e233114 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -50,8 +50,8 @@ public class DisplayDataEvaluatorTest implements Serializable { new PTransform<PCollection<String>, POutput> () { @Override public PCollection<String> apply(PCollection<String> input) { - return input.apply(ParDo.of(new OldDoFn<String, String>() { - @Override + return input.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -79,8 +79,8 @@ public class DisplayDataEvaluatorTest implements Serializable { @Test public void testPrimitiveTransform() { PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of( - new OldDoFn<Integer, Integer>() { - @Override + new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception {} @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 517f968..e2f38b4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -1053,8 +1053,8 @@ public class DisplayDataTest implements Serializable { private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> apply(PCollection<T> input) { - return input.apply(ParDo.of(new OldDoFn<T, T>() { - @Override + return input.apply(ParDo.of(new DoFn<T, T>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 97667a3..c6f82ec 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -84,10 +83,11 @@ public class CoGroupByKeyTest implements Serializable { input = p.apply("Create" + name, Create.timestamped(list, timestamps) .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); } - return input - .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>, - KV<Integer, String>>() { - @Override + return input.apply( + "Identity" + name, + ParDo.of( + new DoFn<KV<Integer, String>, KV<Integer, String>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } @@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable { } /** - * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the - * results of a CoGroupByKey. + * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a + * CoGroupByKey. */ - private static class ClickOfPurchaseFn extends - OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess { + private static class ClickOfPurchaseFn + extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> { private final TupleTag<String> clicksTag; private final TupleTag<String> purchasesTag; @@ -329,9 +329,9 @@ public class CoGroupByKeyTest implements Serializable { this.purchasesTag = purchasesTag; } - @Override - public void processElement(ProcessContext c) { - BoundedWindow w = c.window(); + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BoundedWindow w = window; KV<Integer, CoGbkResult> e = c.element(); CoGbkResult row = e.getValue(); Iterable<String> clicks = row.getAll(clicksTag); @@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable { /** - * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the + * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the * results of a CoGroupByKey. */ private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends - OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> { + DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> { private final TupleTag<String> purchasesTag; private final TupleTag<String> addressesTag; @@ -367,7 +367,7 @@ public class CoGroupByKeyTest implements Serializable { this.namesTag = namesTag; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV<Integer, CoGbkResult> e = c.element(); CoGbkResult row = e.getValue(); @@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable { } /** - * Tests that the consuming OldDoFn + * Tests that the consuming DoFn * (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 27d2539..c583860 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; @@ -199,8 +199,8 @@ public class WindowTest implements Serializable { .apply(GroupByKey.<Integer, String>create()) .apply( ParDo.of( - new OldDoFn<KV<Integer, Iterable<String>>, Void>() { - @Override + new DoFn<KV<Integer, Iterable<String>>, Void>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat( c.timestamp(), @@ -231,8 +231,8 @@ public class WindowTest implements Serializable { .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) .apply(GroupByKey.<Integer, String>create()) - .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() { - @Override + .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 622a277..159e700 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -58,12 +57,14 @@ public class WindowingTest implements Serializable { private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> { - private final class FormatCountsDoFn - extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ":" + c.element().getValue() - + ":" + c.timestamp().getMillis() + ":" + c.window()); + private final class FormatCountsDoFn extends DoFn<KV<String, Long>, String> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output( + c.element().getKey() + + ":" + c.element().getValue() + + ":" + c.timestamp().getMillis() + + ":" + window); } } private WindowFn<? super String, ?> windowFn; @@ -234,9 +235,9 @@ public class WindowingTest implements Serializable { p.run(); } - /** A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> { - @Override + /** A DoFn that tokenizes lines of text into individual words. */ + static class ExtractWordsWithTimestampsFn extends DoFn<String, String> { + @ProcessElement public void processElement(ProcessContext c) { String[] words = c.element().split("[^a-zA-Z0-9']+"); if (words.length == 2) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 547c778..13218b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -75,8 +75,8 @@ public final class PCollectionTupleTest implements Serializable { .apply(Create.of(inputs)); PCollectionTuple outputs = mainInput.apply(ParDo - .of(new OldDoFn<Integer, Integer>() { - @Override + .of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); }}) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index c525cf1..287223f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.Rule; @@ -44,9 +44,9 @@ public class TypedPValueTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private static class IdentityDoFn extends OldDoFn<Integer, Integer> { + private static class IdentityDoFn extends DoFn<Integer, Integer> { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -129,9 +129,9 @@ public class TypedPValueTest { static class EmptyClass { } - private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> { + private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(new EmptyClass()); }
