ian-Liaozy commented on code in PR #37221:
URL: https://github.com/apache/beam/pull/37221#discussion_r2692453779


##########
sdks/python/apache_beam/runners/interactive/recording_manager.py:
##########
@@ -886,10 +887,11 @@ def record(
     # Start a pipeline fragment to start computing the PCollections.
     uncomputed_pcolls = set(pcolls).difference(computed_pcolls)
     if uncomputed_pcolls:
-      if not self._wait_for_dependencies(uncomputed_pcolls):
-        raise RuntimeError(
-            'Cannot record because a dependency failed to compute'
-            ' asynchronously.')
+      if wait_for_inputs:
+        if not self._wait_for_dependencies(uncomputed_pcolls):
+          raise RuntimeError(
+              'Cannot record because a dependency failed to compute'
+              ' asynchronously.')

Review Comment:
   Hi Danny, my thinking here is:
   1. Previously, `ib.collect()` will not automatically wait for background 
caching job to finish. If a user ran `collect()` on a PCollection whose 
dependencies were still computing, they could get empty or partial results 
without warning.
   2. By defaulting `wait_for_inputs=True`, we ensure the standard user 
experience is consistent: we always wait for upstream dependencies to finish 
before collecting. Also, adding `wait_for_inputs` option will align with 
current implementation for `ib.compute()`
   3. Back to the exact code block that you are quoting, the 'bad state' 
happens when user decide to set `wait_for_inputs=False`, explicitly request to 
bypass the safety checks and synchronization. My thinking is it will delegate 
the failure handling to actual pipeline execution, which seems acceptable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to