iht opened a new issue, #32361:
URL: https://github.com/apache/beam/issues/32361
### What happened?
In Dataflow (and other runners probably too), when trying to use `GcsIO`
from a `DoFn`, I get this stacktrace. There is a small reproduction pipeline
below:
```
Error message from worker: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1561, in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 606, in
apache_beam.runners.common.DoFnInvoker.invoke_setup
File "/Users/ihr/projects/gcs_cli_options_reproduction/main.py", line 23,
in setup
File
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line
148, in __init__
self.enable_read_bucket_metric = pipeline_options.get_all_options(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
line 336, in get_all_options
known_args, unknown_args = parser.parse_known_args(self._flags)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/argparse.py", line 1907, in
parse_known_args
namespace, args = self._parse_known_args(args, namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/argparse.py", line 2156, in
_parse_known_args
self.error(_('the following arguments are required: %s') %
File
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
line 136, in error
super().error(message)
File "/usr/local/lib/python3.11/argparse.py", line 2640, in error
self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
File "/usr/local/lib/python3.11/argparse.py", line 2627, in exit
_sys.exit(status)
SystemExit: 2
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 311, in _execute
response = task()
^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 656, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 687, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 510, in get
processor = bundle_processor.BundleProcessor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 976, in __init__
op.setup(self.data_sampler)
File "apache_beam/runners/worker/operations.py", line 876, in
apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 926, in
apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/common.py", line 1567, in
apache_beam.runners.common.DoFnRunner.setup
File "apache_beam/runners/common.py", line 1563, in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 1608, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1561, in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 606, in
apache_beam.runners.common.DoFnInvoker.invoke_setup
File "/Users/ihr/projects/gcs_cli_options_reproduction/main.py", line 23,
in setup
File
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line
148, in __init__
self.enable_read_bucket_metric = pipeline_options.get_all_options(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
line 336, in get_all_options
known_args, unknown_args = parser.parse_known_args(self._flags)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/argparse.py", line 1907, in
parse_known_args
namespace, args = self._parse_known_args(args, namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/argparse.py", line 2156, in
_parse_known_args
self.error(_('the following arguments are required: %s') %
File
"/usr/local/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py",
line 136, in error
super().error(message)
File "/usr/local/lib/python3.11/argparse.py", line 2640, in error
self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
File "/usr/local/lib/python3.11/argparse.py", line 2627, in exit
_sys.exit(status)
RuntimeError: SystemExit: 2 [while running 'Write to GCS-ptransform-51']
```
```python
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,
SetupOptions
class Pipeline_Args(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bucket_location',
dest='bucket_location',
required=True,
help="Just mimicking the customer code")
class UploadFileToGCS(beam.DoFn):
def __init__(self, gcs_location):
self.gcs_location = gcs_location.rstrip("/")
def process(self, element):
data, fn = element
gcs_output_file_location = f"{self.gcs_location}/{fn}"
try:
with beam.io.gcsio.GcsIO().open(gcs_output_file_location, mode="w") as
file:
file.write(data)
yield gcs_output_file_location
except Exception as e:
logging.error(f"Error uploading {gcs_output_file_location}: {e}")
def run():
beam_options = PipelineOptions()
beam_options.view_as(SetupOptions).save_main_session = True
opts_job_args = beam_options.view_as(Pipeline_Args)
bucket_location = opts_job_args.bucket_location
pipeline = beam.Pipeline(options=beam_options)
some_data = pipeline | "Synth data" >> beam.Create(["1", "2", "3"])
with_fn = some_data | "Add fn" >> beam.Map(lambda x: (x,
"tmp/my_file.txt"))
write_gcs = with_fn | "Write to GCS" >>
beam.ParDo(UploadFileToGCS(bucket_location))
pipeline.run().wait_until_finish()
if __name__ == '__main__':
run()
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]