damccorm commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1093369873
##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -525,6 +525,31 @@ def run(self, test_runner_api='AUTO'):
self.contains_external_transforms = (
ExternalTransformFinder.contains_external_transforms(self))
+ # Finds if RunInference has side inputs enables.
+ # also, checks for the side input window is global and has non default
+ # triggers.
+ run_inference_visitor = RunInferenceVisitor().visit_run_inference(self)
+ self._run_inference_contains_side_input = (
+ run_inference_visitor.contains_run_inference_side_inputs)
+
+ self.run_inference_global_window_non_default_trigger = (
+ run_inference_visitor.contains_global_windows_non_default_trigger)
+
+ if (self._run_inference_contains_side_input and
+ not self._options.view_as(StandardOptions).streaming):
+ raise RuntimeError(
+ "SideInputs to RunInference PTransform is only supported "
Review Comment:
Specifically, because batch mode can completely process precondition stages
before processing dependent stages, the meaning of pipeline updates breaks down
(what does updating to the latest version of the model mean when all inferences
are done at the "same time"), and if you do have a streaming source (which our
built ins will be), the pipeline can hang forever.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]