Separates side input test and side output test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a51bdd26 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a51bdd26 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a51bdd26 Branch: refs/heads/DSL_SQL Commit: a51bdd266f9c877cb407de986a465fc9c7de76ff Parents: a9bcc8b Author: Eugene Kirpichov <[email protected]> Authored: Sat Apr 15 16:38:35 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 18:02:06 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/SplittableDoFnTest.java | 63 ++++++++++++++------ 1 file changed, 44 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a51bdd26/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 9e8c12e..30329f4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +62,7 @@ import org.junit.runners.JUnit4; * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior. */ @RunWith(JUnit4.class) -public class SplittableDoFnTest { +public class SplittableDoFnTest implements Serializable { static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> { @ProcessElement @@ -216,22 +217,18 @@ public class SplittableDoFnTest { p.run(); } - private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> { + private static class SDFWithSideInput extends DoFn<Integer, String> { private final PCollectionView<String> sideInput; - private final TupleTag<String> additionalOutput; - private SDFWithSideInputsAndOutputs( - PCollectionView<String> sideInput, TupleTag<String> additionalOutput) { + private SDFWithSideInput(PCollectionView<String> sideInput) { this.sideInput = sideInput; - this.additionalOutput = additionalOutput; } @ProcessElement public void process(ProcessContext c, OffsetRangeTracker tracker) { checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); String side = c.sideInput(sideInput); - c.output("main:" + side + ":" + c.element()); - c.output(additionalOutput, "additional:" + side + ":" + c.element()); + c.output(side + ":" + c.element()); } @GetInitialRestriction @@ -242,27 +239,55 @@ public class SplittableDoFnTest { @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class}) - public void testSideInputsAndOutputs() throws Exception { - + public void testSideInput() throws Exception { PCollectionView<String> sideInput = p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton()); - TupleTag<String> mainOutputTag = new TupleTag<>("main"); - TupleTag<String> additionalOutputTag = new TupleTag<>("additional"); + + PCollection<String> res = + p.apply("input", Create.of(0, 1, 2)) + .apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput)); + + PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2")); + + p.run(); + } + + private static class SDFWithAdditionalOutput extends DoFn<Integer, String> { + private final TupleTag<String> additionalOutput; + + private SDFWithAdditionalOutput(TupleTag<String> additionalOutput) { + this.additionalOutput = additionalOutput; + } + + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); + c.output("main:" + c.element()); + c.output(additionalOutput, "additional:" + c.element()); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Integer value) { + return new OffsetRange(0, 1); + } + } + + @Test + @Category({ValidatesRunner.class, UsesSplittableParDo.class}) + public void testAdditionalOutput() throws Exception { + TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; + TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {}; PCollectionTuple res = p.apply("input", Create.of(0, 1, 2)) .apply( - ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag)) - .withSideInputs(sideInput) + ParDo.of(new SDFWithAdditionalOutput(additionalOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); - res.get(mainOutputTag).setCoder(StringUtf8Coder.of()); - res.get(additionalOutputTag).setCoder(StringUtf8Coder.of()); PAssert.that(res.get(mainOutputTag)) - .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2")); + .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2")); PAssert.that(res.get(additionalOutputTag)) - .containsInAnyOrder( - Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2")); + .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2")); p.run(); }
