iht opened a new issue, #32361:
URL: https://github.com/apache/beam/issues/32361

   ### What happened?
   
   In Dataflow (and other runners probably too), when trying to use `GcsIO` 
from a `DoFn`, I get this stacktrace. There is a small reproduction pipeline 
below:
   
   ```
   Error message from worker: Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1561, in 
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
     File "apache_beam/runners/common.py", line 606, in 
apache_beam.runners.common.DoFnInvoker.invoke_setup
     File "/Users/ihr/projects/gcs_cli_options_reproduction/main.py", line 23, 
in setup
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 
148, in __init__
       self.enable_read_bucket_metric = pipeline_options.get_all_options(
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
 line 336, in get_all_options
       known_args, unknown_args = parser.parse_known_args(self._flags)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/argparse.py", line 1907, in 
parse_known_args
       namespace, args = self._parse_known_args(args, namespace)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/argparse.py", line 2156, in 
_parse_known_args
       self.error(_('the following arguments are required: %s') %
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
 line 136, in error
       super().error(message)
     File "/usr/local/lib/python3.11/argparse.py", line 2640, in error
       self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
     File "/usr/local/lib/python3.11/argparse.py", line 2627, in exit
       _sys.exit(status)
   SystemExit: 2
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 311, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 386, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 656, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 687, in process_bundle
       bundle_processor = self.bundle_processor_cache.get(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 510, in get
       processor = bundle_processor.BundleProcessor(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 976, in __init__
       op.setup(self.data_sampler)
     File "apache_beam/runners/worker/operations.py", line 876, in 
apache_beam.runners.worker.operations.DoOperation.setup
     File "apache_beam/runners/worker/operations.py", line 926, in 
apache_beam.runners.worker.operations.DoOperation.setup
     File "apache_beam/runners/common.py", line 1567, in 
apache_beam.runners.common.DoFnRunner.setup
     File "apache_beam/runners/common.py", line 1563, in 
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
     File "apache_beam/runners/common.py", line 1608, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1561, in 
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
     File "apache_beam/runners/common.py", line 606, in 
apache_beam.runners.common.DoFnInvoker.invoke_setup
     File "/Users/ihr/projects/gcs_cli_options_reproduction/main.py", line 23, 
in setup
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 
148, in __init__
       self.enable_read_bucket_metric = pipeline_options.get_all_options(
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
 line 336, in get_all_options
       known_args, unknown_args = parser.parse_known_args(self._flags)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/argparse.py", line 1907, in 
parse_known_args
       namespace, args = self._parse_known_args(args, namespace)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/argparse.py", line 2156, in 
_parse_known_args
       self.error(_('the following arguments are required: %s') %
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
 line 136, in error
       super().error(message)
     File "/usr/local/lib/python3.11/argparse.py", line 2640, in error
       self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
     File "/usr/local/lib/python3.11/argparse.py", line 2627, in exit
       _sys.exit(status)
   RuntimeError: SystemExit: 2 [while running 'Write to GCS-ptransform-51']
   ```
   
   ```python
   import logging
   
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions, 
SetupOptions
   
   
   class Pipeline_Args(PipelineOptions):
     @classmethod
     def _add_argparse_args(cls, parser):
       parser.add_argument(
         '--bucket_location',
         dest='bucket_location',
         required=True,
         help="Just mimicking the customer code")
   
   
   class UploadFileToGCS(beam.DoFn):
     def __init__(self, gcs_location):
       self.gcs_location = gcs_location.rstrip("/")
   
     def process(self, element):
       data, fn = element
       gcs_output_file_location = f"{self.gcs_location}/{fn}"
       try:
         with beam.io.gcsio.GcsIO().open(gcs_output_file_location, mode="w") as 
file:
           file.write(data)
         yield gcs_output_file_location
       except Exception as e:
         logging.error(f"Error uploading {gcs_output_file_location}: {e}")
   
   
   def run():
     beam_options = PipelineOptions()
     beam_options.view_as(SetupOptions).save_main_session = True
     opts_job_args = beam_options.view_as(Pipeline_Args)
   
     bucket_location = opts_job_args.bucket_location
   
     pipeline = beam.Pipeline(options=beam_options)
   
     some_data = pipeline | "Synth data" >> beam.Create(["1", "2", "3"])
     with_fn = some_data | "Add fn" >> beam.Map(lambda x: (x, 
"tmp/my_file.txt"))
     write_gcs = with_fn | "Write to GCS" >> 
beam.ParDo(UploadFileToGCS(bucket_location))
   
     pipeline.run().wait_until_finish()
   
   
   if __name__ == '__main__':
     run()
   ```
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to