[ https://issues.apache.org/jira/browse/BEAM-9825?focusedWorklogId=432340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432340 ]
ASF GitHub Bot logged work on BEAM-9825: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/May/20 07:48 Start Date: 09/May/20 07:48 Worklog Time Spent: 10m Work Description: darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465074 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ########## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection<String> containing: "1","1","1","2","3","4","4" * }</pre> */ - public static <T> SetUnionAllImpl<T> unionAll(PCollection<T> rightCollection) { + public static <T> SetImpl<T> unionAll(PCollection<T> rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - - return new SetUnionAllImpl<T>(rightCollection); + return new SetImpl<>(rightCollection, unionAll()); } - private static <T> PCollection<T> performSetOperation( - PCollection<T> leftCollection, - PCollection<T> rightCollection, - SerializableBiFunction<Long, Long, Long> fn) { - - TupleTag<Void> leftCollectionTag = new TupleTag<>(); - TupleTag<Void> rightCollectionTag = new TupleTag<>(); - - MapElements<T, KV<T, Void>> elementToVoid = - MapElements.via( - new SimpleFunction<T, KV<T, Void>>() { - @Override - public KV<T, Void> apply(T element) { - return KV.of(element, null); - } - }); - - PCollection<KV<T, Void>> left = leftCollection.apply("PrepareLeftKV", elementToVoid); - PCollection<KV<T, Void>> right = rightCollection.apply("PrepareRightKV", elementToVoid); - - PCollection<KV<T, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(leftCollectionTag, left) - .and(rightCollectionTag, right) - .apply(CoGroupByKey.create()); - // TODO: lift combiners through the CoGBK. - return coGbkResults.apply( - ParDo.of( - new DoFn<KV<T, CoGbkResult>, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { - KV<T, CoGbkResult> elementGroups = c.element(); - - CoGbkResult value = elementGroups.getValue(); - long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); - long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - - T element = elementGroups.getKey(); - for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); - } - } - })); + public static <T> Flatten.PCollections<T> unionAll() { + return Flatten.pCollections(); } public static class SetImpl<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final PCollection<T> rightCollection; - private final SerializableBiFunction<Long, Long, Long> fn; - private SetImpl(PCollection<T> rightCollection, SerializableBiFunction<Long, Long, Long> fn) { - this.rightCollection = rightCollection; - this.fn = fn; + private final transient PCollection<T> right; + private final PTransform<PCollectionList<T>, PCollection<T>> listTransformFn; + + private SetImpl( + PCollection<T> rightCollection, + PTransform<PCollectionList<T>, PCollection<T>> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection<T> expand(PCollection<T> leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final PCollection<T> rightCollection; + public static class SetImplCollections<T> extends PTransform<PCollectionList<T>, PCollection<T>> { + + private final transient SerializableBiFunction<Long, Long, Long> fn; + + private SetImplCollections(SerializableBiFunction<Long, Long, Long> fn) { + this.fn = fn; + } + + private static <T> PCollection<T> performSetOperationCollectionList( + PCollectionList<T> inputs, SerializableBiFunction<Long, Long, Long> fn) { + List<PCollection<T>> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { + return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: Yes. I need to check this as i can't set coder using `setCoder` if there is only one element in the list. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 432340) Remaining Estimate: 90h 20m (was: 90.5h) Time Spent: 5h 40m (was: 5.5h) > Transforms for Intersect, IntersectAll, Except, ExceptAll, Union, UnionAll > -------------------------------------------------------------------------- > > Key: BEAM-9825 > URL: https://issues.apache.org/jira/browse/BEAM-9825 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core > Reporter: Darshan Jani > Assignee: Darshan Jani > Priority: Major > Original Estimate: 96h > Time Spent: 5h 40m > Remaining Estimate: 90h 20m > > I'd like to propose following new high-level transforms. > * Intersect > Compute the intersection between elements of two PCollection. > Given _leftCollection_ and _rightCollection_, this transform returns a > collection containing elements that common to both _leftCollection_ and > _rightCollection_ > > * Except > Compute the difference between elements of two PCollection. > Given _leftCollection_ and _rightCollection_, this transform returns a > collection containing elements that are in _leftCollection_ but not in > _rightCollection_ > * Union > Find the elements that are either of two PCollection. > Implement IntersetAll, ExceptAll and UnionAll variants of transforms. -- This message was sent by Atlassian Jira (v8.3.4#803005)