robertwb commented on code in PR #32445:
URL: https://github.com/apache/beam/pull/32445#discussion_r1805113430


##########
sdks/python/apache_beam/examples/snippets/snippets.py:
##########
@@ -1143,6 +1143,39 @@ def model_multiple_pcollections_flatten(contents, 
output_path):
     merged | beam.io.WriteToText(output_path)
 
 
+def model_multiple_pcollections_flatten_with(contents, output_path):
+  """Merging a PCollection with FlattenWith."""
+  some_hash_fn = lambda s: ord(s[0])
+  partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
+  import apache_beam as beam
+  with TestPipeline() as pipeline:  # Use TestPipeline for testing.
+
+    # Partition into deciles
+    partitioned = pipeline | beam.Create(contents) | beam.Partition(
+        partition_fn, 3)
+    pcoll1 = partitioned[0]
+    pcoll2 = partitioned[1]
+    pcoll3 = partitioned[2]
+    SomeTransform = lambda: beam.Map(lambda x: x)

Review Comment:
   Added this example, filed https://github.com/apache/beam/issues/32840 for 
follow-up. It would be good to think about how we could structure things to 
further reduce redundancy between these various forms of documentation. 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java:
##########
@@ -82,6 +83,81 @@ public static <T> Iterables<T> iterables() {
     return new Iterables<>();
   }
 
+  /**
+   * Returns a {@link PTransform} that flattens the input {@link PCollection} 
with a given a {@link
+   * PCollection} resulting in a {@link PCollection} containing all the 
elements of both {@link
+   * PCollection}s as its output.
+   *
+   * <p>This is equivalent to creating a {@link PCollectionList} containing 
both the input and
+   * {@code other} and then applying {@link #pCollections()}, but has the 
advantage that it can be
+   * more easily used inline.
+   *
+   * <p>Both {@cpde PCollections} must have equal {@link WindowFn}s. The 
output elements of {@code
+   * Flatten<T>} are in the same windows and have the same timestamps as their 
corresponding input
+   * elements. The output {@code PCollection} will have the same {@link 
WindowFn} as both inputs.
+   *
+   * @param other the other PCollection to flatten with the input
+   * @param <T> the type of the elements in the input and output {@code 
PCollection}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> 
with(PCollection<T> other) {
+    return new FlattenWithPCollection<>(other);
+  }
+
+  /** Implementation of {@link #with(PCollection)}. */
+  private static class FlattenWithPCollection<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // We only need to access this at pipeline construction time.
+    private final transient PCollection<T> other;
+
+    public FlattenWithPCollection(PCollection<T> other) {
+      this.other = other;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return PCollectionList.of(input).and(other).apply(pCollections());
+    }
+
+    @Override
+    public String getKindString() {
+      return "Flatten.With";

Review Comment:
   Yeah, I chose this name because it is literally syntactic sugar for the same 
primitive Flatten operation. (Personally, I'd prefer disjoint union, but that's 
probably to obscure let alone too late to change now...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to