Port PAssert to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef5e31f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef5e31f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef5e31f8 Branch: refs/heads/master Commit: ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e Parents: 1959ddb Author: Kenneth Knowles <[email protected]> Authored: Wed Aug 3 20:15:58 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 4 14:56:42 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/testing/PAssert.java | 39 ++++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef5e31f8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 80340c2..e07ee3d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -33,11 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -762,8 +761,8 @@ public class PAssert { .apply("RewindowActuals", rewindowActuals.<T>windowActuals()) .apply( ParDo.of( - new OldDoFn<T, T>() { - @Override + new DoFn<T, T>() { + @ProcessElement public void processElement(ProcessContext context) throws CoderException { context.output(CoderUtils.clone(coder, context.element())); } @@ -884,8 +883,8 @@ public class PAssert { } } - private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> { - @Override + private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(Iterables.concat(c.element())); } @@ -995,13 +994,13 @@ public class PAssert { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. */ - private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> { + private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1015,7 +1014,7 @@ public class PAssert { this.actual = actual; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { ActualT actualContents = c.sideInput(actual); @@ -1030,13 +1029,13 @@ public class PAssert { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * the single iterable element of the input {@link PCollection} and adjusts counters and * thrown exceptions for use in testing. * * <p>The singleton property is presumed, not enforced. */ - private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> { + private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1047,7 +1046,7 @@ public class PAssert { this.checkerFn = checkerFn; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { doChecks(c.element(), checkerFn, success, failure); @@ -1061,14 +1060,14 @@ public class PAssert { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * the single item contained within the single iterable on input and * adjusts counters and thrown exceptions for use in testing. * * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However, * each input element must be a singleton iterable, or this will fail. */ - private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> { + private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1079,7 +1078,7 @@ public class PAssert { this.checkerFn = checkerFn; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { ActualT actualContents = Iterables.getOnlyElement(c.element()); @@ -1310,7 +1309,7 @@ public class PAssert { } /** - * A OldDoFn that filters elements based on their presence in a static collection of windows. + * A DoFn that filters elements based on their presence in a static collection of windows. */ private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> { private final StaticWindows windows; @@ -1324,10 +1323,10 @@ public class PAssert { return input.apply("FilterWindows", ParDo.of(new Fn())); } - private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) throws Exception { - if (windows.getWindows().contains(c.window())) { + private class Fn extends DoFn<T, T> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + if (windows.getWindows().contains(window)) { c.output(c.element()); } }
