liferoad commented on code in PR #36528:
URL: https://github.com/apache/beam/pull/36528#discussion_r2436407574
##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -205,6 +205,19 @@ def open_writer(self, init_result, uid):
# We also ensure there will be no collisions with uid and a
# (possibly unsharded) file_path_prefix and a (possibly empty)
# file_name_suffix.
+ from apache_beam.pvalue import EmptySideInput
Review Comment:
this should fix:
```
=================================== FAILURES
===================================
_______________________ TestParquet.test_sink_transform
________________________
[gw5] linux -- Python 3.13.3
/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/bin/python
self = <apache_beam.io.parquetio_test.TestParquet
testMethod=test_sink_transform>
def test_sink_transform(self):
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/io/parquetio_test.py:359:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/pipeline.py:670:
in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/testing/test_pipeline.py:122:
in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult
object at 0x7901b41dc690>
duration = None
def wait_until_finish(self, duration=None):
self.process_method(*args_for_process, **kwargs_for_process),
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/iobase.py",
line 1431, in process
writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/options/value_provider.py",
line 193, in _f
return fnc(self, *args, **kwargs)
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filebasedsink.py",
line 211, in open_writer
writer_path = FileSystems.join(init_result, uid) + suffix
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
line 169, in join
filesystem = FileSystems.get_filesystem(basepath)
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
line 155, in get_filesystem
raise BeamIOError('Unable to get the Filesystem', {path: e})
apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with
exceptions {<apache_beam.pvalue.EmptySideInput object at 0x7901ba06e5d0>:
AttributeError("'EmptySideInput' object has no attribute 'strip'")}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 316, in _execute
response = task()
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 662, in do_instruction
return getattr(self, request_type)(
~~~~~~~~~~~~~~~~~~~~~~~~~~~^
getattr(request, request_type), request.instruction_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 700, in process_bundle
bundle_processor.process_bundle(instruction_id))
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1274, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
element.data)
^^^^^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 237, in process_encoded
self.output(decoded_value)
~~~~~~~~~~~^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 568, in
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 570, in
apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 260, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
File "apache_beam/runners/worker/operations.py", line 263, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 951, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 952, in
apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
File "apache_beam/runners/common.py", line 1500, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
File "apache_beam/runners/common.py", line 1609, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
File "apache_beam/runners/common.py", line 1498, in
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 912, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 1057, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/iobase.py",
line 1431, in process
writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/options/value_provider.py",
line 193, in _f
return fnc(self, *args, **kwargs)
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filebasedsink.py",
line 211, in open_writer
writer_path = FileSystems.join(init_result, uid) + suffix
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
line 169, in join
filesystem = FileSystems.get_filesystem(basepath)
File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
line 155, in get_filesystem
raise BeamIOError('Unable to get the Filesystem', {path: e})
apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with
exceptions {<apache_beam.pvalue.EmptySideInput object at 0x7901ba06e5d0>:
AttributeError("'EmptySideInput' object has no attribute 'strip'")} [while
running 'WriteToParquet/Write/WriteImpl/WriteBundles'] with exceptions None
```
at least for our tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]