damccorm commented on code in PR #37221:
URL: https://github.com/apache/beam/pull/37221#discussion_r2694094232
##########
sdks/python/apache_beam/runners/interactive/recording_manager.py:
##########
@@ -886,10 +887,11 @@ def record(
# Start a pipeline fragment to start computing the PCollections.
uncomputed_pcolls = set(pcolls).difference(computed_pcolls)
if uncomputed_pcolls:
- if not self._wait_for_dependencies(uncomputed_pcolls):
- raise RuntimeError(
- 'Cannot record because a dependency failed to compute'
- ' asynchronously.')
+ if wait_for_inputs:
+ if not self._wait_for_dependencies(uncomputed_pcolls):
+ raise RuntimeError(
+ 'Cannot record because a dependency failed to compute'
+ ' asynchronously.')
Review Comment:
> By defaulting wait_for_inputs=True, we ensure the standard user experience
is consistent: we always wait for upstream dependencies to finish before
collecting. Also, adding wait_for_inputs option will align with current
implementation for ib.compute()
We are not changing the default behavior in this PR. So I don't think this
is doing what you think it is doing.
> Back to the exact code block that you are quoting, the 'bad state' happens
when user decide to set wait_for_inputs=False, explicitly request to bypass the
safety checks and synchronization. My thinking is it will delegate the failure
handling to actual pipeline execution, which seems acceptable
When is this desirable? It seems like it is always a bad outcome
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]