Port some of ParDoTest to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30940179 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30940179 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30940179 Branch: refs/heads/master Commit: 3094017956913b583a9bd8be5ce685683b591669 Parents: c2e751f Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 12:35:03 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:51 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++--- .../java/org/apache/beam/sdk/util/StringUtils.java | 2 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 16 +++++++--------- 3 files changed, 12 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 11ca853..018877f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -216,9 +215,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD String name, CombineFn<AggInputT, ?, AggOutputT> combiner); /** - * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context. + * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this + * context. * - * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method. + * <p>This method should be called by runners before the {@link StartBundle @StartBundle} + * method. */ @Experimental(Kind.AGGREGATOR) protected final void setupDelegateAggregators() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java index 4f81eef..1c52c1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java @@ -96,7 +96,7 @@ public class StringUtils { } private static final String[] STANDARD_NAME_SUFFIXES = - new String[]{"OldDoFn", "Fn"}; + new String[]{"OldDoFn", "DoFn", "Fn"}; /** * Pattern to match a non-anonymous inner class. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 52244a0..d3ea9fb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; @@ -90,13 +90,11 @@ public class ParDoTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); - private static class PrintingOldDoFn extends OldDoFn<String, String> implements - RequiresWindowAccess { - - @Override - public void processElement(ProcessContext c) { + private static class PrintingDoFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { c.output(c.element() + ":" + c.timestamp().getMillis() - + ":" + c.window().maxTimestamp().getMillis()); + + ":" + window.maxTimestamp().getMillis()); } } @@ -848,7 +846,7 @@ public class ParDoTest implements Serializable { output5.getName()); } - assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName()); + assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName()); assertEquals( "ParMultiDo(SideOutputDummy)", @@ -1381,7 +1379,7 @@ public class ParDoTest implements Serializable { System.out.println("Finish: 3"); } })) - .apply(ParDo.of(new PrintingOldDoFn())); + .apply(ParDo.of(new PrintingDoFn())); PAssert.that(output).satisfies(new Checker());
