AnandInguva commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1092052015


##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -58,6 +65,35 @@ def run_inference(
     for example in batch:
       yield model.predict(example)
 
+  def update_model_path(self, model_path: Optional[str] = None):
+    pass
+
+
+class FakeModelHandlerReturnsPredictionResult(
+    base.ModelHandler[int, base.PredictionResult, FakeModel]):
+  def __init__(self, clock=None, model_id='fake_model_id_default'):
+    self.model_id = model_id
+    self._fake_clock = clock
+
+  def load_model(self):
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    return FakeModel()

Review Comment:
   Added 2 models, addition and sub but in a different IT test.



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -444,6 +528,28 @@ def process(self, batch, inference_args):
 
     return predictions
 
+  def process(
+      self, batch, inference_args, si_model_metadata: Optional[ModelMetdata]):
+    """
+    When side input is enabled:
+      The method checks if the side input model has been updated, and if so,
+      updates the model and runs inference on the batch of data. If the
+      side input is empty or the model has not been updated, the method
+      simply runs inference on the batch of data.
+    """
+    if si_model_metadata and self._enable_side_input_loading:

Review Comment:
   Changed it.



##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -525,6 +525,14 @@ def run(self, test_runner_api='AUTO'):
     self.contains_external_transforms = (
         ExternalTransformFinder.contains_external_transforms(self))
 
+    self.contains_run_inference_transform = (
+        RunInferenceSideInputFinder.contains_run_inference_transform(self))
+
+    if (self.contains_run_inference_transform and
+        not self._options.view_as(StandardOptions).streaming):
+      raise RuntimeError(

Review Comment:
   I am good with error too.



##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -525,6 +525,14 @@ def run(self, test_runner_api='AUTO'):
     self.contains_external_transforms = (
         ExternalTransformFinder.contains_external_transforms(self))
 
+    self.contains_run_inference_transform = (
+        RunInferenceSideInputFinder.contains_run_inference_transform(self))
+
+    if (self.contains_run_inference_transform and
+        not self._options.view_as(StandardOptions).streaming):
+      raise RuntimeError(

Review Comment:
   I added two tests. One to check streaming flag and another one to check 
windowing., 
   
   For the windowing, do we want to throw the error or can we throw a warning 
error saying `this will lead to improper results`?



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    first_ts = math.floor(time.time()) - 30
+    interval = 5
+    main_input_windowing_interval = 7
+
+    # aligning timestamp to get persistent results
+    first_ts = first_ts - (
+        first_ts % (interval * main_input_windowing_interval))
+    last_ts = first_ts + 45
+
+    sample_main_input_elements = ([first_ts - 2, # no output due to no SI

Review Comment:
   removed this test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to