Repository: beam Updated Branches: refs/heads/master ba5bee668 -> 8d8f7fa95
[BEAM-3007] Add test which covers PCollection consumed by Flatten and another primitive. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0c76522 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0c76522 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0c76522 Branch: refs/heads/master Commit: b0c765222dd0e01447b1b9631b99a9f289ee0a1d Parents: ba5bee6 Author: Luke Cwik <[email protected]> Authored: Mon Oct 2 11:44:22 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue Oct 3 11:48:11 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/FlattenTest.java | 35 ++++++++++++++++++++ 1 file changed, 35 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b0c76522/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index 5dbe176..0a21716 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -56,7 +56,10 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; @@ -314,6 +317,38 @@ public class FlattenTest implements Serializable { p.run(); } + @Test + @Category(ValidatesRunner.class) + public void testFlattenMultiplePCollectionsHavingMultipleConsumers() { + PCollection<String> input = p.apply(Create.of("AA", "BBB", "CC")); + final TupleTag<String> outputEvenLengthTag = new TupleTag<String>() {}; + final TupleTag<String> outputOddLengthTag = new TupleTag<String>() {}; + + PCollectionTuple tuple = input.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().length() % 2 == 0) { + c.output(c.element()); + } else { + c.output(outputOddLengthTag, c.element()); + } + } + }).withOutputTags(outputEvenLengthTag, TupleTagList.of(outputOddLengthTag))); + + PCollection<String> outputEvenLength = tuple.get(outputEvenLengthTag); + PCollection<String> outputOddLength = tuple.get(outputOddLengthTag); + + PCollection<String> outputMerged = PCollectionList.of(outputEvenLength) + .and(outputOddLength) + .apply(Flatten.<String>pCollections()); + + PAssert.that(outputMerged).containsInAnyOrder("AA", "BBB", "CC"); + PAssert.that(outputEvenLength).containsInAnyOrder("AA", "CC"); + PAssert.that(outputOddLength).containsInAnyOrder("BBB"); + + p.run(); + } + ///////////////////////////////////////////////////////////////////////////// @Test
