damccorm commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1089229678


##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -199,7 +199,8 @@ For more information, see 
[`KeyedModelHander`](https://beam.apache.org/releases/
 
 When doing a prediction in Apache Beam, the output `PCollection` includes both 
the keys of the input examples and the inferences. Including both these items 
in the output allows you to find the input that determined the predictions.
 
-The `PredictionResult` is a `NamedTuple` object that contains both the input 
and the inferences, named  `example` and  `inference`, respectively. When keys 
are passed with the input data to the RunInference transform, the output 
`PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and 
the `PredictionResult` object. Your pipeline interacts with a 
`PredictionResult` object in steps after the RunInference transform.
+The `PredictionResult` is a `NamedTuple` object that contains both the input, 
inferences, and model_id
+named  `example`,  `inference`, `model_id` respectively. When keys are passed 
with the input data to the RunInference transform, the output `PCollection` 
returns a `Tuple[str, PredictionResult]`, which is the key and the 
`PredictionResult` object. Your pipeline interacts with a `PredictionResult` 
object in steps after the RunInference transform.

Review Comment:
   ```suggestion
   The `PredictionResult` is a `NamedTuple` object that contains both the input 
and the inferences, named  `example` and  `inference`, respectively. When keys 
are passed with the input data to the RunInference transform, the output 
`PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and 
the `PredictionResult` object. Your pipeline interacts with a 
`PredictionResult` object in steps after the RunInference transform.
   ```
   
   Looks like you missed this when removing website updates



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -58,6 +65,35 @@ def run_inference(
     for example in batch:
       yield model.predict(example)
 
+  def update_model_path(self, model_path: Optional[str] = None):
+    pass
+
+
+class FakeModelHandlerReturnsPredictionResult(
+    base.ModelHandler[int, base.PredictionResult, FakeModel]):
+  def __init__(self, clock=None, model_id='fake_model_id_default'):
+    self.model_id = model_id
+    self._fake_clock = clock
+
+  def load_model(self):
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    return FakeModel()

Review Comment:
   Right now, this test doesn't actually show that this load_model method is 
getting invoked since this is basically a no-op after the first go-round. Could 
we rewrite this test so that FakeModel (or different fake model class) takes in 
the updated model id here and uses that to modify what it returns for its 
prediction?



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    first_ts = math.floor(time.time()) - 30
+    interval = 5
+    main_input_windowing_interval = 7
+
+    # aligning timestamp to get persistent results
+    first_ts = first_ts - (
+        first_ts % (interval * main_input_windowing_interval))
+    last_ts = first_ts + 45
+
+    sample_main_input_elements = ([first_ts - 2, # no output due to no SI
+                                   first_ts + 8,  # first window
+                                   first_ts + 22,  # second window
+                                   ])
+
+    sample_side_input_elements = [
+        base.ModelMetdata(
+            model_id='fake_model_id_1', model_name='fake_model_id_1')
+    ]
+
+    class EmitSideInput(beam.DoFn):
+      def process(self, element):
+        for e in element:
+          yield e
+
+    model_handler = FakeModelHandlerReturnsPredictionResult()
+
+    side_input = (
+        test_pipeline
+        |
+        "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True)

Review Comment:
   Should we be applying windowing on this PeriodicImpulse? In general our 
guidance is going to be to use the global window, right?



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)

Review Comment:
   Should we be forcing this to be a streaming pipeline? Without perfect 
windowing, this won't work on Batch (and it doesn't make much sense), right?



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)

Review Comment:
   Relatedly, one thing that I think this PR is missing is validation that we 
are only running this in streaming mode. We'll probably need something similar 
to the `ExternalTransformFinder` to find RunInference with model updates at 
pipeline compile time - 
https://github.com/apache/beam/blob/3bf17574c6f18942d1015d5a837af23ae83c4d1a/sdks/python/apache_beam/pipeline.py#L1049
 - or we'll need a way to read pipeline options during construction of this 
transform itself.
   
   Its ok to add that in a separate PR, but we definitely need to do it (and 
document it)



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    first_ts = math.floor(time.time()) - 30
+    interval = 5
+    main_input_windowing_interval = 7
+
+    # aligning timestamp to get persistent results
+    first_ts = first_ts - (
+        first_ts % (interval * main_input_windowing_interval))
+    last_ts = first_ts + 45
+
+    sample_main_input_elements = ([first_ts - 2, # no output due to no SI

Review Comment:
   `no output due to no SI` - This does still get an output, though, right? 
Since the side input still fires within this window



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
     third_party_model_handler.batch_elements_kwargs()
     third_party_model_handler.validate_inference_args({})
 
+  def test_run_inference_prediction_result_with_model_id(self):
+    examples = [1, 5, 3, 10]
+    expected = [
+        base.PredictionResult(
+            example=example,
+            inference=example + 1,
+            model_id='fake_model_id_default') for example in examples
+    ]
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandlerReturnsPredictionResult())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  @pytest.mark.it_postcommit
+  def test_run_inference_prediction_result_with_side_input(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    first_ts = math.floor(time.time()) - 30
+    interval = 5
+    main_input_windowing_interval = 7
+
+    # aligning timestamp to get persistent results
+    first_ts = first_ts - (
+        first_ts % (interval * main_input_windowing_interval))
+    last_ts = first_ts + 45
+
+    sample_main_input_elements = ([first_ts - 2, # no output due to no SI
+                                   first_ts + 8,  # first window
+                                   first_ts + 22,  # second window
+                                   ])
+
+    sample_side_input_elements = [
+        base.ModelMetdata(
+            model_id='fake_model_id_1', model_name='fake_model_id_1')
+    ]
+
+    class EmitSideInput(beam.DoFn):
+      def process(self, element):
+        for e in element:
+          yield e
+
+    model_handler = FakeModelHandlerReturnsPredictionResult()
+
+    side_input = (
+        test_pipeline
+        |
+        "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True)
+        | beam.Map(lambda x: sample_side_input_elements)

Review Comment:
   Ideally we would test multiple sets of side input updates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to