TheNeuralBit commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863241349
########## sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py: ########## @@ -121,6 +126,148 @@ def test_pardo(self): | beam.Map(lambda e: e + 'x')) assert_that(res, equal_to(['aax', 'bcbcx'])) + def test_batch_pardo(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([6, 12, 18])) + + def test_batch_rebatch_pardos(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.ParDo(ListPlusOneDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([9, 15, 21])) + + def test_batch_pardo_fusion_break(self): + class NormalizeDoFn(beam.DoFn): + @no_type_check + def process_batch( + self, + batch: np.ndarray, + mean: np.float64, + ) -> Iterator[np.ndarray]: + assert isinstance(batch, np.ndarray) + yield batch - mean + + # infer_output_type must be defined (when there's no process method), Review Comment: It does fall back to Any, but in this case I want the element type to be specific since it also represents the element type of the ndarray -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org