darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465074
########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ########## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection<String> containing: "1","1","1","2","3","4","4" * }</pre> */ - public static <T> SetUnionAllImpl<T> unionAll(PCollection<T> rightCollection) { + public static <T> SetImpl<T> unionAll(PCollection<T> rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - - return new SetUnionAllImpl<T>(rightCollection); + return new SetImpl<>(rightCollection, unionAll()); } - private static <T> PCollection<T> performSetOperation( - PCollection<T> leftCollection, - PCollection<T> rightCollection, - SerializableBiFunction<Long, Long, Long> fn) { - - TupleTag<Void> leftCollectionTag = new TupleTag<>(); - TupleTag<Void> rightCollectionTag = new TupleTag<>(); - - MapElements<T, KV<T, Void>> elementToVoid = - MapElements.via( - new SimpleFunction<T, KV<T, Void>>() { - @Override - public KV<T, Void> apply(T element) { - return KV.of(element, null); - } - }); - - PCollection<KV<T, Void>> left = leftCollection.apply("PrepareLeftKV", elementToVoid); - PCollection<KV<T, Void>> right = rightCollection.apply("PrepareRightKV", elementToVoid); - - PCollection<KV<T, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(leftCollectionTag, left) - .and(rightCollectionTag, right) - .apply(CoGroupByKey.create()); - // TODO: lift combiners through the CoGBK. - return coGbkResults.apply( - ParDo.of( - new DoFn<KV<T, CoGbkResult>, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { - KV<T, CoGbkResult> elementGroups = c.element(); - - CoGbkResult value = elementGroups.getValue(); - long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); - long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - - T element = elementGroups.getKey(); - for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); - } - } - })); + public static <T> Flatten.PCollections<T> unionAll() { + return Flatten.pCollections(); } public static class SetImpl<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final PCollection<T> rightCollection; - private final SerializableBiFunction<Long, Long, Long> fn; - private SetImpl(PCollection<T> rightCollection, SerializableBiFunction<Long, Long, Long> fn) { - this.rightCollection = rightCollection; - this.fn = fn; + private final transient PCollection<T> right; + private final PTransform<PCollectionList<T>, PCollection<T>> listTransformFn; + + private SetImpl( + PCollection<T> rightCollection, + PTransform<PCollectionList<T>, PCollection<T>> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection<T> expand(PCollection<T> leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final PCollection<T> rightCollection; + public static class SetImplCollections<T> extends PTransform<PCollectionList<T>, PCollection<T>> { + + private final transient SerializableBiFunction<Long, Long, Long> fn; + + private SetImplCollections(SerializableBiFunction<Long, Long, Long> fn) { + this.fn = fn; + } + + private static <T> PCollection<T> performSetOperationCollectionList( + PCollectionList<T> inputs, SerializableBiFunction<Long, Long, Long> fn) { + List<PCollection<T>> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { + return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: Yes. I need to check this as i can't set coder using `setCoder` if there is only one element in the list. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org