AnandInguva commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1089554815
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
third_party_model_handler.batch_elements_kwargs()
third_party_model_handler.validate_inference_args({})
+ def test_run_inference_prediction_result_with_model_id(self):
+ examples = [1, 5, 3, 10]
+ expected = [
+ base.PredictionResult(
+ example=example,
+ inference=example + 1,
+ model_id='fake_model_id_default') for example in examples
+ ]
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ actual = pcoll | base.RunInference(
+ FakeModelHandlerReturnsPredictionResult())
+ assert_that(actual, equal_to(expected), label='assert:inferences')
+
+ @pytest.mark.it_postcommit
+ def test_run_inference_prediction_result_with_side_input(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
Review Comment:
I added `RunInferenceSideInputFinder`, which checks for
`_enable_side_input_loading` bool.
I think it is the simple way to go like you suggested. Thanks.
if `_enable_side_input_loading` is enables and streaming flag not specified,
we fail the pipeline during construction phase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]