Repository: beam
Updated Branches:
  refs/heads/master ba5bee668 -> 8d8f7fa95


[BEAM-3007] Add test which covers PCollection consumed by Flatten and another 
primitive.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0c76522
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0c76522
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0c76522

Branch: refs/heads/master
Commit: b0c765222dd0e01447b1b9631b99a9f289ee0a1d
Parents: ba5bee6
Author: Luke Cwik <[email protected]>
Authored: Mon Oct 2 11:44:22 2017 -0700
Committer: Luke Cwik <[email protected]>
Committed: Tue Oct 3 11:48:11 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/FlattenTest.java | 35 ++++++++++++++++++++
 1 file changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0c76522/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 5dbe176..0a21716 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -56,7 +56,10 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -314,6 +317,38 @@ public class FlattenTest implements Serializable {
     p.run();
   }
 
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testFlattenMultiplePCollectionsHavingMultipleConsumers() {
+    PCollection<String> input = p.apply(Create.of("AA", "BBB", "CC"));
+    final TupleTag<String> outputEvenLengthTag = new TupleTag<String>() {};
+    final TupleTag<String> outputOddLengthTag = new TupleTag<String>() {};
+
+    PCollectionTuple tuple = input.apply(ParDo.of(new DoFn<String, String>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        if (c.element().length() % 2 == 0) {
+          c.output(c.element());
+        } else {
+          c.output(outputOddLengthTag, c.element());
+        }
+      }
+    }).withOutputTags(outputEvenLengthTag, 
TupleTagList.of(outputOddLengthTag)));
+
+    PCollection<String> outputEvenLength = tuple.get(outputEvenLengthTag);
+    PCollection<String> outputOddLength = tuple.get(outputOddLengthTag);
+
+    PCollection<String> outputMerged = PCollectionList.of(outputEvenLength)
+        .and(outputOddLength)
+        .apply(Flatten.<String>pCollections());
+
+    PAssert.that(outputMerged).containsInAnyOrder("AA", "BBB", "CC");
+    PAssert.that(outputEvenLength).containsInAnyOrder("AA", "CC");
+    PAssert.that(outputOddLength).containsInAnyOrder("BBB");
+
+    p.run();
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   @Test

Reply via email to