Port ViewTest to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1db02d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1db02d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1db02d2 Branch: refs/heads/master Commit: b1db02d23f9454ff1a169d0aa81552e8dbe59fe3 Parents: 32f84bb Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 12:07:28 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ViewTest.java | 192 ++++++++++--------- 1 file changed, 97 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1db02d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index ee240bf..170e6ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.values.KV.of; + import static com.google.common.base.Preconditions.checkArgument; import static org.hamcrest.Matchers.isA; @@ -100,8 +102,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("Create123", Create.of(1, 2, 3)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -131,8 +133,8 @@ public class ViewTest implements Serializable { TimestampedValue.of(3, new Instant(12)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -153,8 +155,8 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asSingleton()); pipeline.apply("Create123", Create.of(1, 2, 3)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -178,8 +180,8 @@ public class ViewTest implements Serializable { final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton()); oneTwoThree.apply( - "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -205,8 +207,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); @@ -246,8 +248,8 @@ public class ViewTest implements Serializable { .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); @@ -274,8 +276,8 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertFalse(c.sideInput(view).iterator().hasNext()); @@ -283,7 +285,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -300,8 +302,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { try { c.sideInput(view).clear(); @@ -329,7 +331,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); pipeline.run(); @@ -347,8 +349,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); @@ -387,8 +389,8 @@ public class ViewTest implements Serializable { TimestampedValue.of(35, new Instant(11)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); @@ -413,15 +415,15 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).iterator().hasNext()); c.output(1); } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -438,8 +440,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { Iterator<Integer> iterator = c.sideInput(view).iterator(); while (iterator.hasNext()) { @@ -453,7 +455,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); pipeline.run(); @@ -472,11 +474,11 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { - c.output(KV.of(c.element(), v)); + c.output(of(c.element(), v)); } } })); @@ -500,8 +502,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); assertEquals((int) c.element(), c.sideInput(view).entrySet().size()); @@ -554,11 +556,11 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { - c.output(KV.of(c.element(), v)); + c.output(of(c.element(), v)); } } })); @@ -591,13 +593,13 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view) .get(c.element().substring(0, 1))) { - c.output(KV.of(c.element(), v)); + c.output(of(c.element(), v)); } } })); @@ -629,8 +631,8 @@ public class ViewTest implements Serializable { TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<Integer, KV<String, Integer>>() { - @Override + new DoFn<Integer, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -674,13 +676,13 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view) .get(c.element().substring(0, 1))) { - c.output(KV.of(c.element(), v)); + c.output(of(c.element(), v)); } } })); @@ -704,8 +706,8 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertTrue(c.sideInput(view).entrySet().isEmpty()); @@ -714,7 +716,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -734,8 +736,8 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertTrue(c.sideInput(view).entrySet().isEmpty()); @@ -744,7 +746,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -763,8 +765,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { try { c.sideInput(view).clear(); @@ -792,7 +794,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); pipeline.run(); @@ -811,11 +813,11 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output( - KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); + of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } })); @@ -838,8 +840,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); assertEquals((int) c.element(), c.sideInput(view).entrySet().size()); @@ -870,11 +872,11 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output( - KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); + of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } })); @@ -906,8 +908,8 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of( c.element(), @@ -943,8 +945,8 @@ public class ViewTest implements Serializable { TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<Integer, KV<String, Integer>>() { - @Override + new DoFn<Integer, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -988,10 +990,10 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, KV<String, Integer>>() { - @Override + new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { - c.output(KV.of( + c.output(of( c.element(), c.sideInput(view).get( c.element().substring(0, 1)))); @@ -1017,8 +1019,8 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertTrue(c.sideInput(view).entrySet().isEmpty()); @@ -1027,7 +1029,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -1046,8 +1048,8 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertTrue(c.sideInput(view).entrySet().isEmpty()); @@ -1056,7 +1058,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -1080,8 +1082,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output( KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); @@ -1111,8 +1113,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { try { c.sideInput(view).clear(); @@ -1139,7 +1141,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that OldDoFn executes. + // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); pipeline.run(); @@ -1158,8 +1160,8 @@ public class ViewTest implements Serializable { PCollection<KV<String, Integer>> output = pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply("Output", - ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); @@ -1193,8 +1195,8 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, String>() { - @Override + new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } @@ -1226,8 +1228,8 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, String>() { - @Override + new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } @@ -1257,8 +1259,8 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new OldDoFn<String, String>() { - @Override + new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } @@ -1287,8 +1289,8 @@ public class ViewTest implements Serializable { p.apply("CreateMainInput", Create.of("")) .apply( "OutputMainAndSideInputs", - ParDo.withSideInputs(view).of(new OldDoFn<String, String>() { - @Override + ParDo.withSideInputs(view).of(new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } @@ -1305,8 +1307,8 @@ public class ViewTest implements Serializable { Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view1 = pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() { - @Override + .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(17); } @@ -1317,8 +1319,8 @@ public class ViewTest implements Serializable { pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of())) .apply( "OutputSideInput", - ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() { - @Override + ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view1)); } @@ -1328,8 +1330,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("ReadIterableSideInput", - ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() { - @Override + ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() { + @ProcessElement public void processElement(ProcessContext c) { for (Iterable<Integer> input : c.sideInput(view2)) { for (Integer i : input) {
