liferoad commented on code in PR #36528:
URL: https://github.com/apache/beam/pull/36528#discussion_r2436407574


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -205,6 +205,19 @@ def open_writer(self, init_result, uid):
     # We also ensure there will be no collisions with uid and a
     # (possibly unsharded) file_path_prefix and a (possibly empty)
     # file_name_suffix.
+    from apache_beam.pvalue import EmptySideInput

Review Comment:
   this should fix:
   ```
   =================================== FAILURES 
===================================
   _______________________ TestParquet.test_sink_transform 
________________________
   [gw5] linux -- Python 3.13.3 
/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/bin/python
   
   self = <apache_beam.io.parquetio_test.TestParquet 
testMethod=test_sink_transform>
   
       def test_sink_transform(self):
         with TemporaryDirectory() as tmp_dirname:
           path = os.path.join(tmp_dirname + "tmp_filename")
   >       with TestPipeline() as p:
                ^^^^^^^^^^^^^^
   
   apache_beam/io/parquetio_test.py:359: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/pipeline.py:670:
 in __exit__
       self.result = self.run()
                     ^^^^^^^^^^
   
target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/testing/test_pipeline.py:122:
 in run
       state = result.wait_until_finish(duration=self.timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.runners.portability.portable_runner.PipelineResult 
object at 0x7901b41dc690>
   duration = None
   
       def wait_until_finish(self, duration=None):
       self.process_method(*args_for_process, **kwargs_for_process),
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/iobase.py",
 line 1431, in process
       writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/options/value_provider.py",
 line 193, in _f
       return fnc(self, *args, **kwargs)
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filebasedsink.py",
 line 211, in open_writer
       writer_path = FileSystems.join(init_result, uid) + suffix
                     ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
 line 169, in join
       filesystem = FileSystems.get_filesystem(basepath)
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
 line 155, in get_filesystem
       raise BeamIOError('Unable to get the Filesystem', {path: e})
   apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with 
exceptions {<apache_beam.pvalue.EmptySideInput object at 0x7901ba06e5d0>: 
AttributeError("'EmptySideInput' object has no attribute 'strip'")}
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 316, in _execute
       response = task()
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 390, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 662, in do_instruction
       return getattr(self, request_type)(
              ~~~~~~~~~~~~~~~~~~~~~~~~~~~^
           getattr(request, request_type), request.instruction_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 700, in process_bundle
       bundle_processor.process_bundle(instruction_id))
       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1274, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
           element.data)
           ^^^^^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
       ~~~~~~~~~~~^^^^^^^^^^^^^^^
     File "apache_beam/runners/worker/operations.py", line 568, in 
apache_beam.runners.worker.operations.Operation.output
       def output(self, windowed_value, output_index=0):
     File "apache_beam/runners/worker/operations.py", line 570, in 
apache_beam.runners.worker.operations.Operation.output
       _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
     File "apache_beam/runners/worker/operations.py", line 260, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       def receive(self, windowed_value):
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       self.consumer.process(windowed_value)
     File "apache_beam/runners/worker/operations.py", line 951, in 
apache_beam.runners.worker.operations.DoOperation.process
       with self.scoped_process_state:
     File "apache_beam/runners/worker/operations.py", line 952, in 
apache_beam.runners.worker.operations.DoOperation.process
       delayed_applications = self.dofn_runner.process(o)
     File "apache_beam/runners/common.py", line 1500, in 
apache_beam.runners.common.DoFnRunner.process
       self._reraise_augmented(exn, windowed_value)
     File "apache_beam/runners/common.py", line 1609, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
       raise new_exn
     File "apache_beam/runners/common.py", line 1498, in 
apache_beam.runners.common.DoFnRunner.process
       return self.do_fn_invoker.invoke_process(windowed_value)
     File "apache_beam/runners/common.py", line 912, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
       self._invoke_process_per_window(
     File "apache_beam/runners/common.py", line 1057, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
       self.process_method(*args_for_process, **kwargs_for_process),
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/iobase.py",
 line 1431, in process
       writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/options/value_provider.py",
 line 193, in _f
       return fnc(self, *args, **kwargs)
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filebasedsink.py",
 line 211, in open_writer
       writer_path = FileSystems.join(init_result, uid) + suffix
                     ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
 line 169, in join
       filesystem = FileSystems.get_filesystem(basepath)
     File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-cloud/py313-cloud/lib/python3.13/site-packages/apache_beam/io/filesystems.py",
 line 155, in get_filesystem
       raise BeamIOError('Unable to get the Filesystem', {path: e})
   apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with 
exceptions {<apache_beam.pvalue.EmptySideInput object at 0x7901ba06e5d0>: 
AttributeError("'EmptySideInput' object has no attribute 'strip'")} [while 
running 'WriteToParquet/Write/WriteImpl/WriteBundles'] with exceptions None
   ```
   at least for our tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to