damccorm commented on code in PR #25200: URL: https://github.com/apache/beam/pull/25200#discussion_r1089229678
########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -199,7 +199,8 @@ For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/ When doing a prediction in Apache Beam, the output `PCollection` includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions. -The `PredictionResult` is a `NamedTuple` object that contains both the input and the inferences, named `example` and `inference`, respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. +The `PredictionResult` is a `NamedTuple` object that contains both the input, inferences, and model_id +named `example`, `inference`, `model_id` respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. Review Comment: ```suggestion The `PredictionResult` is a `NamedTuple` object that contains both the input and the inferences, named `example` and `inference`, respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. ``` Looks like you missed this when removing website updates ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -58,6 +65,35 @@ def run_inference( for example in batch: yield model.predict(example) + def update_model_path(self, model_path: Optional[str] = None): + pass + + +class FakeModelHandlerReturnsPredictionResult( + base.ModelHandler[int, base.PredictionResult, FakeModel]): + def __init__(self, clock=None, model_id='fake_model_id_default'): + self.model_id = model_id + self._fake_clock = clock + + def load_model(self): + if self._fake_clock: + self._fake_clock.current_time_ns += 500_000_000 # 500ms + return FakeModel() Review Comment: Right now, this test doesn't actually show that this load_model method is getting invoked since this is basically a no-op after the first go-round. Could we rewrite this test so that FakeModel (or different fake model class) takes in the updated model id here and uses that to modify what it returns for its prediction? ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -339,6 +375,79 @@ def validate_inference_args( third_party_model_handler.batch_elements_kwargs() third_party_model_handler.validate_inference_args({}) + def test_run_inference_prediction_result_with_model_id(self): + examples = [1, 5, 3, 10] + expected = [ + base.PredictionResult( + example=example, + inference=example + 1, + model_id='fake_model_id_default') for example in examples + ] + with TestPipeline() as pipeline: + pcoll = pipeline | 'start' >> beam.Create(examples) + actual = pcoll | base.RunInference( + FakeModelHandlerReturnsPredictionResult()) + assert_that(actual, equal_to(expected), label='assert:inferences') + + @pytest.mark.it_postcommit + def test_run_inference_prediction_result_with_side_input(self): + test_pipeline = TestPipeline(is_integration_test=True) + + first_ts = math.floor(time.time()) - 30 + interval = 5 + main_input_windowing_interval = 7 + + # aligning timestamp to get persistent results + first_ts = first_ts - ( + first_ts % (interval * main_input_windowing_interval)) + last_ts = first_ts + 45 + + sample_main_input_elements = ([first_ts - 2, # no output due to no SI + first_ts + 8, # first window + first_ts + 22, # second window + ]) + + sample_side_input_elements = [ + base.ModelMetdata( + model_id='fake_model_id_1', model_name='fake_model_id_1') + ] + + class EmitSideInput(beam.DoFn): + def process(self, element): + for e in element: + yield e + + model_handler = FakeModelHandlerReturnsPredictionResult() + + side_input = ( + test_pipeline + | + "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True) Review Comment: Should we be applying windowing on this PeriodicImpulse? In general our guidance is going to be to use the global window, right? ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -339,6 +375,79 @@ def validate_inference_args( third_party_model_handler.batch_elements_kwargs() third_party_model_handler.validate_inference_args({}) + def test_run_inference_prediction_result_with_model_id(self): + examples = [1, 5, 3, 10] + expected = [ + base.PredictionResult( + example=example, + inference=example + 1, + model_id='fake_model_id_default') for example in examples + ] + with TestPipeline() as pipeline: + pcoll = pipeline | 'start' >> beam.Create(examples) + actual = pcoll | base.RunInference( + FakeModelHandlerReturnsPredictionResult()) + assert_that(actual, equal_to(expected), label='assert:inferences') + + @pytest.mark.it_postcommit + def test_run_inference_prediction_result_with_side_input(self): + test_pipeline = TestPipeline(is_integration_test=True) Review Comment: Should we be forcing this to be a streaming pipeline? Without perfect windowing, this won't work on Batch (and it doesn't make much sense), right? ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -339,6 +375,79 @@ def validate_inference_args( third_party_model_handler.batch_elements_kwargs() third_party_model_handler.validate_inference_args({}) + def test_run_inference_prediction_result_with_model_id(self): + examples = [1, 5, 3, 10] + expected = [ + base.PredictionResult( + example=example, + inference=example + 1, + model_id='fake_model_id_default') for example in examples + ] + with TestPipeline() as pipeline: + pcoll = pipeline | 'start' >> beam.Create(examples) + actual = pcoll | base.RunInference( + FakeModelHandlerReturnsPredictionResult()) + assert_that(actual, equal_to(expected), label='assert:inferences') + + @pytest.mark.it_postcommit + def test_run_inference_prediction_result_with_side_input(self): + test_pipeline = TestPipeline(is_integration_test=True) Review Comment: Relatedly, one thing that I think this PR is missing is validation that we are only running this in streaming mode. We'll probably need something similar to the `ExternalTransformFinder` to find RunInference with model updates at pipeline compile time - https://github.com/apache/beam/blob/3bf17574c6f18942d1015d5a837af23ae83c4d1a/sdks/python/apache_beam/pipeline.py#L1049 - or we'll need a way to read pipeline options during construction of this transform itself. Its ok to add that in a separate PR, but we definitely need to do it (and document it) ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -339,6 +375,79 @@ def validate_inference_args( third_party_model_handler.batch_elements_kwargs() third_party_model_handler.validate_inference_args({}) + def test_run_inference_prediction_result_with_model_id(self): + examples = [1, 5, 3, 10] + expected = [ + base.PredictionResult( + example=example, + inference=example + 1, + model_id='fake_model_id_default') for example in examples + ] + with TestPipeline() as pipeline: + pcoll = pipeline | 'start' >> beam.Create(examples) + actual = pcoll | base.RunInference( + FakeModelHandlerReturnsPredictionResult()) + assert_that(actual, equal_to(expected), label='assert:inferences') + + @pytest.mark.it_postcommit + def test_run_inference_prediction_result_with_side_input(self): + test_pipeline = TestPipeline(is_integration_test=True) + + first_ts = math.floor(time.time()) - 30 + interval = 5 + main_input_windowing_interval = 7 + + # aligning timestamp to get persistent results + first_ts = first_ts - ( + first_ts % (interval * main_input_windowing_interval)) + last_ts = first_ts + 45 + + sample_main_input_elements = ([first_ts - 2, # no output due to no SI Review Comment: `no output due to no SI` - This does still get an output, though, right? Since the side input still fires within this window ########## sdks/python/apache_beam/ml/inference/base_test.py: ########## @@ -339,6 +375,79 @@ def validate_inference_args( third_party_model_handler.batch_elements_kwargs() third_party_model_handler.validate_inference_args({}) + def test_run_inference_prediction_result_with_model_id(self): + examples = [1, 5, 3, 10] + expected = [ + base.PredictionResult( + example=example, + inference=example + 1, + model_id='fake_model_id_default') for example in examples + ] + with TestPipeline() as pipeline: + pcoll = pipeline | 'start' >> beam.Create(examples) + actual = pcoll | base.RunInference( + FakeModelHandlerReturnsPredictionResult()) + assert_that(actual, equal_to(expected), label='assert:inferences') + + @pytest.mark.it_postcommit + def test_run_inference_prediction_result_with_side_input(self): + test_pipeline = TestPipeline(is_integration_test=True) + + first_ts = math.floor(time.time()) - 30 + interval = 5 + main_input_windowing_interval = 7 + + # aligning timestamp to get persistent results + first_ts = first_ts - ( + first_ts % (interval * main_input_windowing_interval)) + last_ts = first_ts + 45 + + sample_main_input_elements = ([first_ts - 2, # no output due to no SI + first_ts + 8, # first window + first_ts + 22, # second window + ]) + + sample_side_input_elements = [ + base.ModelMetdata( + model_id='fake_model_id_1', model_name='fake_model_id_1') + ] + + class EmitSideInput(beam.DoFn): + def process(self, element): + for e in element: + yield e + + model_handler = FakeModelHandlerReturnsPredictionResult() + + side_input = ( + test_pipeline + | + "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True) + | beam.Map(lambda x: sample_side_input_elements) Review Comment: Ideally we would test multiple sets of side input updates -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
