This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c0b23a7ea0b fix output_handler (#26923)
c0b23a7ea0b is described below
commit c0b23a7ea0bd7faf0bdecb2d16e8aed8d8af24a3
Author: liferoad <[email protected]>
AuthorDate: Fri Jun 2 14:27:53 2023 -0400
fix output_handler (#26923)
---
.../runners/direct/sdf_direct_runner.py | 6 ++--
sdks/python/apache_beam/transforms/util_test.py | 34 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index 57510be749d..8f732d83124 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -121,7 +121,7 @@ class PairWithRestrictionFn(beam.DoFn):
def start_bundle(self):
self._invoker = DoFnInvoker.create_invoker(
self._signature,
- output_processor=_NoneShallPassOutputHandler(),
+ output_handler=_NoneShallPassOutputHandler(),
process_invocation=False)
def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
@@ -142,7 +142,7 @@ class SplitRestrictionFn(beam.DoFn):
signature = DoFnSignature(self._do_fn)
self._invoker = DoFnInvoker.create_invoker(
signature,
- output_processor=_NoneShallPassOutputHandler(),
+ output_handler=_NoneShallPassOutputHandler(),
process_invocation=False)
def process(self, element_and_restriction, *args, **kwargs):
@@ -273,7 +273,7 @@ class ProcessFn(beam.DoFn):
self.sdf_invoker = DoFnInvoker.create_invoker(
DoFnSignature(self.sdf),
context=DoFnContext('unused_context'),
- output_processor=self._output_processor,
+ output_handler=self._output_processor,
input_args=args_for_invoker,
input_kwargs=kwargs_for_invoker)
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index 3051017f65d..50b3ca817db 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -19,10 +19,12 @@
# pytype: skip-file
+import json
import logging
import math
import random
import re
+import tempfile
import time
import unittest
import warnings
@@ -44,6 +46,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.runners import pipeline_context
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import SortLists
@@ -902,6 +905,37 @@ class GroupIntoBatchesTest(unittest.TestCase):
GroupIntoBatchesTest.BATCH_SIZE))
]))
+ def test_in_global_window_with_text_file(self):
+ # this test will raise asserts since DirectRunner misses this feature:
+ # sdf_direct_runner currently does not support GroupIntoBatches
+ # from bundles of SDF source and will throw this AttributeError
+ with tempfile.NamedTemporaryFile(suffix=".json", mode="w+t") as f:
+ f.write(json.dumps(GroupIntoBatchesTest._create_test_data()))
+ f.flush()
+ with self.assertRaises((RuntimeError, AttributeError)):
+ with TestPipeline() as pipeline:
+ collection = pipeline \
+ | beam.io.ReadFromText(file_pattern=f.name) \
+ | beam.Map(lambda e: json.loads(e)) \
+ | beam.Map(lambda e: (e["key"], e)) \
+ | util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE)
+ assert collection
+
+ def test_in_global_window_with_synthetic_source(self):
+ # this test will raise asserts since DirectRunner misses this feature:
+ # sdf_direct_runner currently does not support GroupIntoBatches
+ # from bundles of SDF source and will throw this AttributeError
+ with self.assertRaises((RuntimeError, AttributeError)):
+ with beam.Pipeline() as pipeline:
+ _ = (
+ pipeline
+ | beam.io.Read(
+ SyntheticSource({
+ "numRecords": 10, "keySizeBytes": 1, "valueSizeBytes": 1
+ }))
+ | "Group key" >> beam.GroupIntoBatches(2, 1)
+ | beam.Map(print))
+
def test_with_sharded_key_in_global_window(self):
with TestPipeline() as pipeline:
collection = (