robertwb commented on code in PR #30807:
URL: https://github.com/apache/beam/pull/30807#discussion_r1546911384


##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -1644,3 +1645,39 @@ def _process(element):
       yield r
 
     return pcoll | FlatMap(_process)
+
+
[email protected]_input_types(T)
[email protected]_output_types(T)
+class WaitOn(PTransform):
+  """Delays processing of a {@link PCollection} until another set of
+  PCollections has finished being processed. For example::
+
+     X | WaitOn(Y, Z) | SomeTransform()
+
+  would ensure that PCollections Y and Z (and hence their producing transforms)
+  are complete before SomeTransform gets executed on the elements of X.
+  This can be especially useful the waited-on PCollections are the outputs
+  of transforms that interact with external systems (such as writing to a
+  database or other sink).
+
+  For streaming, this delay is done on a per-window basis, i.e.
+  the corresponding window of each waited-on PCollection is computed before
+  elements are passed through the main collection.
+
+  This barrier often induces a fusion break.
+  """
+  def __init__(self, *to_be_waited_on):
+    self._to_be_waited_on = to_be_waited_on
+
+  def expand(self, pcoll):
+    # All we care about is the watermark, not the data itself.
+    # The GroupByKey avoids writing empty files for each shard, and also
+    # ensures the respective window finishes before advancing the timestamp.
+    sides = [
+        pvalue.AsIter(
+            pcoll
+            | f"WaitFor{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey()))

Review Comment:
   Good call. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to