robertwb commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1093598440


##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -525,6 +525,31 @@ def run(self, test_runner_api='AUTO'):
     self.contains_external_transforms = (
         ExternalTransformFinder.contains_external_transforms(self))
 
+    # Finds if RunInference has side inputs enables.
+    # also, checks for the side input window is global and has non default
+    # triggers.
+    run_inference_visitor = RunInferenceVisitor().visit_run_inference(self)
+    self._run_inference_contains_side_input = (
+        run_inference_visitor.contains_run_inference_side_inputs)
+
+    self.run_inference_global_window_non_default_trigger = (
+        run_inference_visitor.contains_global_windows_non_default_trigger)
+
+    if (self._run_inference_contains_side_input and
+        not self._options.view_as(StandardOptions).streaming):
+      raise RuntimeError(
+          "SideInputs to RunInference PTransform is only supported "

Review Comment:
   > > > > If you do have a streaming source, the pipeline should automatically 
run in streaming mode.
   > 
   > > > AFAIK, this isn't actually true in Python today.
   > 
   > > If that's the case, we should definitely fix this :).
   > 
   > I tend to agree, but IIRC there were concerns about doing this (@tvalentyn 
thoughts?). It may have just been a matter of us not getting around to doing it 
yet. I would at least be worried about our existing IOs behaving correctly if 
we were to change this behavior, I'm not sure how crisp we've been here and I 
know of at least one (PeriodicImpulse/Sequence) that wasn't correctly marking 
itself unbounded. That's all solvable though, and probably isn't made worse by 
the change regardless.
   
   Filed https://github.com/apache/beam/issues/25264 ; we can continue 
discussion there (including if there's any downsides to this change, though I 
don't see any). If there is undesirable behavior, like hanging, when unbounded 
sources are used, this is entirely orthogonal to RunInferrence. 
   
   > > Could you clarify a bit more why understanding runner internals is 
required for understanding what happens in the batch case (or, in other words, 
what confusing behavior the user would run into)? I'm not proposing we do away 
with the "easy" mode when the model is known at compile time (for batch or 
streaming), rather that we allow its computation to be deferred in both modes 
if this is explicitly what the user asks for.
   > 
   > If the following things happened:
   > 
   > 1. Pipeline starts with model A
   > 2. 10,000 records arrive for inference + I do some preprocessing
   > 3. Model updates to model B
   > 4. 10,000 records arrive
   > 5. Model updates to model C
   > 6. 1 more records arrives
   > 
   > If I'm in batch mode, all 20,001 records get processed by model C (I 
actually think this depends on the runner, correct me if I'm wrong). If I'm in 
streaming mode, model A and model B each process 10,000 records, model C 
processes 1 record. The streaming case is by far the more useful one, and 
almost definitely the use case that most people are after with this kind of 
feature, but the discrepancy here is odd if you don't know how the model works.
   
   One can't have this scenario for batch. In batch, one has 20,001 records and 
three models. None of the models (or records) are "first" and if anything there 
is ambiguity which model should be used. (Actually, with AsSingleton, an error 
will generally be raised if there's more than one.)
   
   Now if the model (and records) are appropriately timestamped (this works in 
batch), they'll all get the "right" model. Bonus: this makes testing a lot 
easier. 
   
   > With all that said, I can see cases where the batch use case would be nice 
(e.g. it allows daily pipelines without updating your model manually) - I think 
the framing of "deferred computation" vs "live model updates" helps there, and 
I could get behind allowing this. Though, the idea of a single unchanged 
transform for batch and streaming pipelines still seems fairly unrealistic to 
me because the boundedness of the side input source would need to be changed to 
match the boundedness of the main source in most cases to be useful.
   
   A bounded side input with an unbounded main input is easy to understand, and 
the other way around can work too (especially if one is using timestamped data) 
though a bit odd. But we shouldn't fall into the trap that RunInferrence is 
always going to be at the top level (i.e. at the same level of the Reads)--it 
may be used in an even higher-level composite for example. 
   
   > My biggest sticking point is still that AFAIK unbounded sources don't 
automatically put pipelines in streaming mode, which leads to unexplained 
pipeline hangs in batch mode for this use case. Whether we change that or not, 
my vote would be to not block this PR on that change.
   
   If using an unbounded side input causes hangs, yes, let's warn on that 
(ideally as high a level as makes sense, it's not a RunInference-specific 
issue) until this is fixed. But I'd rather not prohibit side inputs altogether. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to