damccorm commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1093577541
##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -525,6 +525,31 @@ def run(self, test_runner_api='AUTO'):
self.contains_external_transforms = (
ExternalTransformFinder.contains_external_transforms(self))
+ # Finds if RunInference has side inputs enables.
+ # also, checks for the side input window is global and has non default
+ # triggers.
+ run_inference_visitor = RunInferenceVisitor().visit_run_inference(self)
+ self._run_inference_contains_side_input = (
+ run_inference_visitor.contains_run_inference_side_inputs)
+
+ self.run_inference_global_window_non_default_trigger = (
+ run_inference_visitor.contains_global_windows_non_default_trigger)
+
+ if (self._run_inference_contains_side_input and
+ not self._options.view_as(StandardOptions).streaming):
+ raise RuntimeError(
+ "SideInputs to RunInference PTransform is only supported "
Review Comment:
>>> If you do have a streaming source, the pipeline should automatically run
in streaming mode.
>> AFAIK, this isn't actually true in Python today.
> If that's the case, we should definitely fix this :).
I tend to agree, but IIRC there were concerns about doing this (@tvalentyn
thoughts?). It may have just been a matter of us not getting around to doing it
yet. I would at least be worried about our existing IOs behaving correctly if
we were to change this behavior, I'm not sure how crisp we've been here and I
know of at least one (PeriodicImpulse/Sequence) that wasn't correctly marking
itself unbounded. That's all solvable though, and probably isn't made worse by
the change regardless.
> Could you clarify a bit more why understanding runner internals is
required for understanding what happens in the batch case (or, in other words,
what confusing behavior the user would run into)? I'm not proposing we do away
with the "easy" mode when the model is known at compile time (for batch or
streaming), rather that we allow its computation to be deferred in both modes
if this is explicitly what the user asks for.
If the following things happened:
1) Pipeline starts with model A
2) 10,000 records arrive for inference + I do some preprocessing
3) Model updates to model B
4) 10,000 records arrive
5) Model updates to model C
6) 1 more records arrives
If I'm in batch mode, all 20,001 records get processed by model C (I
actually think this depends on the runner, correct me if I'm wrong). If I'm in
streaming mode, model A and model B each process 10,000 records, model C
processes 1 record. The streaming case is by far the more useful one, and
almost definitely the use case that most people are after with this kind of
feature, but the discrepancy here is odd if you don't know how the model works.
With all that said, I can see cases where the batch use case would be nice
(e.g. it allows daily pipelines without updating your model manually) - I think
the framing of "deferred computation" vs "live model updates" helps there, and
I could get behind allowing this. Though, the idea of a single unchanged
transform for batch and streaming pipelines still seems fairly unrealistic to
me because the boundedness of the side input source would need to be changed to
match the boundedness of the main source in most cases to be useful.
My biggest sticking point is still that AFAIK unbounded sources don't
automatically put pipelines in streaming mode, which leads to unexplained
pipeline hangs in batch mode for this use case. Whether we change that or not,
my vote would be to not block this PR on that change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]