TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863241349


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:
##########
@@ -121,6 +126,148 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([6, 12, 18]))
+
+  def test_batch_rebatch_pardos(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.ParDo(ListPlusOneDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([9, 15, 21]))
+
+  def test_batch_pardo_fusion_break(self):
+    class NormalizeDoFn(beam.DoFn):
+      @no_type_check
+      def process_batch(
+          self,
+          batch: np.ndarray,
+          mean: np.float64,
+      ) -> Iterator[np.ndarray]:
+        assert isinstance(batch, np.ndarray)
+        yield batch - mean
+
+      # infer_output_type must be defined (when there's no process method),

Review Comment:
   It does fall back to Any, but in this case I want the element type to be 
specific since it also represents the element type of the ndarray 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to