This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b23a7ea0b fix output_handler (#26923)
c0b23a7ea0b is described below

commit c0b23a7ea0bd7faf0bdecb2d16e8aed8d8af24a3
Author: liferoad <[email protected]>
AuthorDate: Fri Jun 2 14:27:53 2023 -0400

    fix output_handler (#26923)
---
 .../runners/direct/sdf_direct_runner.py            |  6 ++--
 sdks/python/apache_beam/transforms/util_test.py    | 34 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py 
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index 57510be749d..8f732d83124 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -121,7 +121,7 @@ class PairWithRestrictionFn(beam.DoFn):
   def start_bundle(self):
     self._invoker = DoFnInvoker.create_invoker(
         self._signature,
-        output_processor=_NoneShallPassOutputHandler(),
+        output_handler=_NoneShallPassOutputHandler(),
         process_invocation=False)
 
   def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
@@ -142,7 +142,7 @@ class SplitRestrictionFn(beam.DoFn):
     signature = DoFnSignature(self._do_fn)
     self._invoker = DoFnInvoker.create_invoker(
         signature,
-        output_processor=_NoneShallPassOutputHandler(),
+        output_handler=_NoneShallPassOutputHandler(),
         process_invocation=False)
 
   def process(self, element_and_restriction, *args, **kwargs):
@@ -273,7 +273,7 @@ class ProcessFn(beam.DoFn):
     self.sdf_invoker = DoFnInvoker.create_invoker(
         DoFnSignature(self.sdf),
         context=DoFnContext('unused_context'),
-        output_processor=self._output_processor,
+        output_handler=self._output_processor,
         input_args=args_for_invoker,
         input_kwargs=kwargs_for_invoker)
 
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index 3051017f65d..50b3ca817db 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -19,10 +19,12 @@
 
 # pytype: skip-file
 
+import json
 import logging
 import math
 import random
 import re
+import tempfile
 import time
 import unittest
 import warnings
@@ -44,6 +46,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.runners import pipeline_context
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import SortLists
@@ -902,6 +905,37 @@ class GroupIntoBatchesTest(unittest.TestCase):
                       GroupIntoBatchesTest.BATCH_SIZE))
           ]))
 
+  def test_in_global_window_with_text_file(self):
+    # this test will raise asserts since DirectRunner misses this feature:
+    # sdf_direct_runner currently does not support GroupIntoBatches
+    # from bundles of SDF source and will throw this AttributeError
+    with tempfile.NamedTemporaryFile(suffix=".json", mode="w+t") as f:
+      f.write(json.dumps(GroupIntoBatchesTest._create_test_data()))
+      f.flush()
+      with self.assertRaises((RuntimeError, AttributeError)):
+        with TestPipeline() as pipeline:
+          collection = pipeline \
+                      | beam.io.ReadFromText(file_pattern=f.name) \
+                      | beam.Map(lambda e: json.loads(e)) \
+                      | beam.Map(lambda e: (e["key"], e)) \
+                      | util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE)
+          assert collection
+
+  def test_in_global_window_with_synthetic_source(self):
+    # this test will raise asserts since DirectRunner misses this feature:
+    # sdf_direct_runner currently does not support GroupIntoBatches
+    # from bundles of SDF source and will throw this AttributeError
+    with self.assertRaises((RuntimeError, AttributeError)):
+      with beam.Pipeline() as pipeline:
+        _ = (
+            pipeline
+            | beam.io.Read(
+                SyntheticSource({
+                    "numRecords": 10, "keySizeBytes": 1, "valueSizeBytes": 1
+                }))
+            | "Group key" >> beam.GroupIntoBatches(2, 1)
+            | beam.Map(print))
+
   def test_with_sharded_key_in_global_window(self):
     with TestPipeline() as pipeline:
       collection = (

Reply via email to