robertwb commented on code in PR #32445:
URL: https://github.com/apache/beam/pull/32445#discussion_r1805113430
##########
sdks/python/apache_beam/examples/snippets/snippets.py:
##########
@@ -1143,6 +1143,39 @@ def model_multiple_pcollections_flatten(contents,
output_path):
merged | beam.io.WriteToText(output_path)
+def model_multiple_pcollections_flatten_with(contents, output_path):
+ """Merging a PCollection with FlattenWith."""
+ some_hash_fn = lambda s: ord(s[0])
+ partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
+ import apache_beam as beam
+ with TestPipeline() as pipeline: # Use TestPipeline for testing.
+
+ # Partition into deciles
+ partitioned = pipeline | beam.Create(contents) | beam.Partition(
+ partition_fn, 3)
+ pcoll1 = partitioned[0]
+ pcoll2 = partitioned[1]
+ pcoll3 = partitioned[2]
+ SomeTransform = lambda: beam.Map(lambda x: x)
Review Comment:
Added this example, filed https://github.com/apache/beam/issues/32840 for
follow-up. It would be good to think about how we could structure things to
further reduce redundancy between these various forms of documentation.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java:
##########
@@ -82,6 +83,81 @@ public static <T> Iterables<T> iterables() {
return new Iterables<>();
}
+ /**
+ * Returns a {@link PTransform} that flattens the input {@link PCollection}
with a given a {@link
+ * PCollection} resulting in a {@link PCollection} containing all the
elements of both {@link
+ * PCollection}s as its output.
+ *
+ * <p>This is equivalent to creating a {@link PCollectionList} containing
both the input and
+ * {@code other} and then applying {@link #pCollections()}, but has the
advantage that it can be
+ * more easily used inline.
+ *
+ * <p>Both {@cpde PCollections} must have equal {@link WindowFn}s. The
output elements of {@code
+ * Flatten<T>} are in the same windows and have the same timestamps as their
corresponding input
+ * elements. The output {@code PCollection} will have the same {@link
WindowFn} as both inputs.
+ *
+ * @param other the other PCollection to flatten with the input
+ * @param <T> the type of the elements in the input and output {@code
PCollection}s.
+ */
+ public static <T> PTransform<PCollection<T>, PCollection<T>>
with(PCollection<T> other) {
+ return new FlattenWithPCollection<>(other);
+ }
+
+ /** Implementation of {@link #with(PCollection)}. */
+ private static class FlattenWithPCollection<T>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ // We only need to access this at pipeline construction time.
+ private final transient PCollection<T> other;
+
+ public FlattenWithPCollection(PCollection<T> other) {
+ this.other = other;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return PCollectionList.of(input).and(other).apply(pCollections());
+ }
+
+ @Override
+ public String getKindString() {
+ return "Flatten.With";
Review Comment:
Yeah, I chose this name because it is literally syntactic sugar for the same
primitive Flatten operation. (Personally, I'd prefer disjoint union, but that's
probably to obscure let alone too late to change now...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]