[ 
https://issues.apache.org/jira/browse/BEAM-9825?focusedWorklogId=432340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432340
 ]

ASF GitHub Bot logged work on BEAM-9825:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/20 07:48
            Start Date: 09/May/20 07:48
    Worklog Time Spent: 10m 
      Work Description: darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465074



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##########
@@ -187,83 +237,112 @@
    *     left.apply(SetFns.unionAll(right)); // results will be 
PCollection<String> containing: "1","1","1","2","3","4","4"
    * }</pre>
    */
-  public static <T> SetUnionAllImpl<T> unionAll(PCollection<T> 
rightCollection) {
+  public static <T> SetImpl<T> unionAll(PCollection<T> rightCollection) {
     checkNotNull(rightCollection, "rightCollection argument is null");
-
-    return new SetUnionAllImpl<T>(rightCollection);
+    return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static <T> PCollection<T> performSetOperation(
-      PCollection<T> leftCollection,
-      PCollection<T> rightCollection,
-      SerializableBiFunction<Long, Long, Long> fn) {
-
-    TupleTag<Void> leftCollectionTag = new TupleTag<>();
-    TupleTag<Void> rightCollectionTag = new TupleTag<>();
-
-    MapElements<T, KV<T, Void>> elementToVoid =
-        MapElements.via(
-            new SimpleFunction<T, KV<T, Void>>() {
-              @Override
-              public KV<T, Void> apply(T element) {
-                return KV.of(element, null);
-              }
-            });
-
-    PCollection<KV<T, Void>> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-    PCollection<KV<T, Void>> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-    PCollection<KV<T, CoGbkResult>> coGbkResults =
-        KeyedPCollectionTuple.of(leftCollectionTag, left)
-            .and(rightCollectionTag, right)
-            .apply(CoGroupByKey.create());
-    // TODO: lift combiners through the CoGBK.
-    return coGbkResults.apply(
-        ParDo.of(
-            new DoFn<KV<T, CoGbkResult>, T>() {
-
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                KV<T, CoGbkResult> elementGroups = c.element();
-
-                CoGbkResult value = elementGroups.getValue();
-                long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-                long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-                T element = elementGroups.getKey();
-                for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-                  c.output(element);
-                }
-              }
-            }));
+  public static <T> Flatten.PCollections<T> unionAll() {
+    return Flatten.pCollections();
   }
 
   public static class SetImpl<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    private final PCollection<T> rightCollection;
-    private final SerializableBiFunction<Long, Long, Long> fn;
 
-    private SetImpl(PCollection<T> rightCollection, 
SerializableBiFunction<Long, Long, Long> fn) {
-      this.rightCollection = rightCollection;
-      this.fn = fn;
+    private final transient PCollection<T> right;
+    private final PTransform<PCollectionList<T>, PCollection<T>> 
listTransformFn;
+
+    private SetImpl(
+        PCollection<T> rightCollection,
+        PTransform<PCollectionList<T>, PCollection<T>> listTransformFn) {
+      this.right = rightCollection;
+      this.listTransformFn = listTransformFn;
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> leftCollection) {
-      return performSetOperation(leftCollection, rightCollection, fn)
-          .setCoder(leftCollection.getCoder());
+      return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
     }
   }
 
-  public static class SetUnionAllImpl<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    private final PCollection<T> rightCollection;
+  public static class SetImplCollections<T> extends 
PTransform<PCollectionList<T>, PCollection<T>> {
+
+    private final transient SerializableBiFunction<Long, Long, Long> fn;
+
+    private SetImplCollections(SerializableBiFunction<Long, Long, Long> fn) {
+      this.fn = fn;
+    }
+
+    private static <T> PCollection<T> performSetOperationCollectionList(
+            PCollectionList<T> inputs, SerializableBiFunction<Long, Long, 
Long> fn) {
+      List<PCollection<T>> all = inputs.getAll();
+      int size = all.size();
+      if (size == 1) {
+        return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
       Yes. I need to check this as i can't set coder using `setCoder` if there 
is only one element in the list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 432340)
    Remaining Estimate: 90h 20m  (was: 90.5h)
            Time Spent: 5h 40m  (was: 5.5h)

> Transforms for Intersect, IntersectAll, Except, ExceptAll, Union, UnionAll
> --------------------------------------------------------------------------
>
>                 Key: BEAM-9825
>                 URL: https://issues.apache.org/jira/browse/BEAM-9825
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Darshan Jani
>            Assignee: Darshan Jani
>            Priority: Major
>   Original Estimate: 96h
>          Time Spent: 5h 40m
>  Remaining Estimate: 90h 20m
>
> I'd like to propose following new high-level transforms.
>  * Intersect
> Compute the intersection between elements of two PCollection.
> Given _leftCollection_ and _rightCollection_, this transform returns a 
> collection containing elements that common to both _leftCollection_ and 
> _rightCollection_
>  
>  * Except
> Compute the difference between elements of two PCollection.
> Given _leftCollection_ and _rightCollection_, this transform returns a 
> collection containing elements that are in _leftCollection_ but not in 
> _rightCollection_
>  * Union
> Find the elements that are either of two PCollection.
> Implement IntersetAll, ExceptAll and UnionAll variants of transforms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to