darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465074



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##########
@@ -187,83 +237,112 @@
    *     left.apply(SetFns.unionAll(right)); // results will be 
PCollection<String> containing: "1","1","1","2","3","4","4"
    * }</pre>
    */
-  public static <T> SetUnionAllImpl<T> unionAll(PCollection<T> 
rightCollection) {
+  public static <T> SetImpl<T> unionAll(PCollection<T> rightCollection) {
     checkNotNull(rightCollection, "rightCollection argument is null");
-
-    return new SetUnionAllImpl<T>(rightCollection);
+    return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static <T> PCollection<T> performSetOperation(
-      PCollection<T> leftCollection,
-      PCollection<T> rightCollection,
-      SerializableBiFunction<Long, Long, Long> fn) {
-
-    TupleTag<Void> leftCollectionTag = new TupleTag<>();
-    TupleTag<Void> rightCollectionTag = new TupleTag<>();
-
-    MapElements<T, KV<T, Void>> elementToVoid =
-        MapElements.via(
-            new SimpleFunction<T, KV<T, Void>>() {
-              @Override
-              public KV<T, Void> apply(T element) {
-                return KV.of(element, null);
-              }
-            });
-
-    PCollection<KV<T, Void>> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-    PCollection<KV<T, Void>> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-    PCollection<KV<T, CoGbkResult>> coGbkResults =
-        KeyedPCollectionTuple.of(leftCollectionTag, left)
-            .and(rightCollectionTag, right)
-            .apply(CoGroupByKey.create());
-    // TODO: lift combiners through the CoGBK.
-    return coGbkResults.apply(
-        ParDo.of(
-            new DoFn<KV<T, CoGbkResult>, T>() {
-
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                KV<T, CoGbkResult> elementGroups = c.element();
-
-                CoGbkResult value = elementGroups.getValue();
-                long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-                long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-                T element = elementGroups.getKey();
-                for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-                  c.output(element);
-                }
-              }
-            }));
+  public static <T> Flatten.PCollections<T> unionAll() {
+    return Flatten.pCollections();
   }
 
   public static class SetImpl<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    private final PCollection<T> rightCollection;
-    private final SerializableBiFunction<Long, Long, Long> fn;
 
-    private SetImpl(PCollection<T> rightCollection, 
SerializableBiFunction<Long, Long, Long> fn) {
-      this.rightCollection = rightCollection;
-      this.fn = fn;
+    private final transient PCollection<T> right;
+    private final PTransform<PCollectionList<T>, PCollection<T>> 
listTransformFn;
+
+    private SetImpl(
+        PCollection<T> rightCollection,
+        PTransform<PCollectionList<T>, PCollection<T>> listTransformFn) {
+      this.right = rightCollection;
+      this.listTransformFn = listTransformFn;
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> leftCollection) {
-      return performSetOperation(leftCollection, rightCollection, fn)
-          .setCoder(leftCollection.getCoder());
+      return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
     }
   }
 
-  public static class SetUnionAllImpl<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    private final PCollection<T> rightCollection;
+  public static class SetImplCollections<T> extends 
PTransform<PCollectionList<T>, PCollection<T>> {
+
+    private final transient SerializableBiFunction<Long, Long, Long> fn;
+
+    private SetImplCollections(SerializableBiFunction<Long, Long, Long> fn) {
+      this.fn = fn;
+    }
+
+    private static <T> PCollection<T> performSetOperationCollectionList(
+            PCollectionList<T> inputs, SerializableBiFunction<Long, Long, 
Long> fn) {
+      List<PCollection<T>> all = inputs.getAll();
+      int size = all.size();
+      if (size == 1) {
+        return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
       Yes. I need to check this as i can't set coder using `setCoder` if there 
is only one element in the list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to