damccorm commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1089306996
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
third_party_model_handler.batch_elements_kwargs()
third_party_model_handler.validate_inference_args({})
+ def test_run_inference_prediction_result_with_model_id(self):
+ examples = [1, 5, 3, 10]
+ expected = [
+ base.PredictionResult(
+ example=example,
+ inference=example + 1,
+ model_id='fake_model_id_default') for example in examples
+ ]
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ actual = pcoll | base.RunInference(
+ FakeModelHandlerReturnsPredictionResult())
+ assert_that(actual, equal_to(expected), label='assert:inferences')
+
+ @pytest.mark.it_postcommit
+ def test_run_inference_prediction_result_with_side_input(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+
+ first_ts = math.floor(time.time()) - 30
+ interval = 5
+ main_input_windowing_interval = 7
+
+ # aligning timestamp to get persistent results
+ first_ts = first_ts - (
+ first_ts % (interval * main_input_windowing_interval))
+ last_ts = first_ts + 45
+
+ sample_main_input_elements = ([first_ts - 2, # no output due to no SI
+ first_ts + 8, # first window
+ first_ts + 22, # second window
+ ])
+
+ sample_side_input_elements = [
+ base.ModelMetdata(
+ model_id='fake_model_id_1', model_name='fake_model_id_1')
+ ]
+
+ class EmitSideInput(beam.DoFn):
+ def process(self, element):
+ for e in element:
+ yield e
+
+ model_handler = FakeModelHandlerReturnsPredictionResult()
+
+ side_input = (
+ test_pipeline
+ |
+ "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True)
Review Comment:
(If so, this is a restriction we should probably enforce during pipeline
construction)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]