shunping opened a new pull request, #38426:
URL: https://github.com/apache/beam/pull/38426
The test `TestBigQueryFileLoads.test_reshuffle_before_load_0` is flaky with
the following traceback:
```
The traceback is
```
E RuntimeError: Pipeline job-007 failed in state FAILED: bundle
inst005 stage-005 failed:Traceback (most recent call last):
E File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/localfilesystem.py",
line 79, in mkdirs
E os.makedirs(path)
E File
"/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/os.py", line 225, in
makedirs
E mkdir(name, mode)
E FileExistsError: [Errno 17] File exists:
'/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/tmp/tmpsbxy358s/bq_load/91debe7ffc444e4789363222a9eb638e/project1.dataset1.table1'
E
E During handling of the above exception, another exception occurred:
E
E Traceback (most recent call last):
E File "apache_beam/runners/common.py", line 1499, in
apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 913, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
E self._invoke_process_per_window(
E File "apache_beam/runners/common.py", line 1056, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E self.output_handler.handle_process_outputs(
E File "apache_beam/runners/common.py", line 1674, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
E File "apache_beam/runners/worker/operations.py", line 264, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E self.consumer.process(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 955, in
apache_beam.runners.worker.operations.DoOperation.process
E with self.scoped_process_state:
E File "apache_beam/runners/worker/operations.py", line 956, in
apache_beam.runners.worker.operations.DoOperation.process
E delayed_applications = self.dofn_runner.process(o)
E File "apache_beam/runners/common.py", line 1501, in
apache_beam.runners.common.DoFnRunner.process
E self._reraise_augmented(exn, windowed_value)
E File "apache_beam/runners/common.py", line 1589, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
E raise exn
E File "apache_beam/runners/common.py", line 1499, in
apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 913, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
E self._invoke_process_per_window(
E File "apache_beam/runners/common.py", line 1056, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E self.output_handler.handle_process_outputs(
E File "apache_beam/runners/common.py", line 1684, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
E self._write_value_to_tag(tag, windowed_value,
watermark_estimator)
E File "apache_beam/runners/common.py", line 1797, in
apache_beam.runners.common._OutputHandler._write_value_to_tag
E self.main_receivers.receive(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 264, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E self.consumer.process(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 955, in
apache_beam.runners.worker.operations.DoOperation.process
E with self.scoped_process_state:
E File "apache_beam/runners/worker/operations.py", line 956, in
apache_beam.runners.worker.operations.DoOperation.process
E delayed_applications = self.dofn_runner.process(o)
E File "apache_beam/runners/common.py", line 1501, in
apache_beam.runners.common.DoFnRunner.process
E self._reraise_augmented(exn, windowed_value)
E File "apache_beam/runners/common.py", line 1589, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
E raise exn
E File "apache_beam/runners/common.py", line 1499, in
apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 685, in
apache_beam.runners.common.SimpleInvoker.invoke_process
E self.output_handler.handle_process_outputs(
E File "apache_beam/runners/common.py", line 1684, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
E self._write_value_to_tag(tag, windowed_value,
watermark_estimator)
E File "apache_beam/runners/common.py", line 1797, in
apache_beam.runners.common._OutputHandler._write_value_to_tag
E self.main_receivers.receive(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 264, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E self.consumer.process(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 955, in
apache_beam.runners.worker.operations.DoOperation.process
E with self.scoped_process_state:
E File "apache_beam/runners/worker/operations.py", line 956, in
apache_beam.runners.worker.operations.DoOperation.process
E delayed_applications = self.dofn_runner.process(o)
E File "apache_beam/runners/common.py", line 1501, in
apache_beam.runners.common.DoFnRunner.process
E self._reraise_augmented(exn, windowed_value)
E File "apache_beam/runners/common.py", line 1610, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
E raise new_exn
E File "apache_beam/runners/common.py", line 1499, in
apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 913, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
E self._invoke_process_per_window(
E File "apache_beam/runners/common.py", line 1056, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E self.output_handler.handle_process_outputs(
E File "apache_beam/runners/common.py", line 1674, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
E for result in results:
E File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
line 248, in process
E self._destination_to_file_writer[destination] =
_make_new_file_writer(
E File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
line 137, in _make_new_file_writer
E fs.FileSystems.mkdirs(directory)
E File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/filesystems.py",
line 201, in mkdirs
E return filesystem.mkdirs(path)
E File
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/localfilesystem.py",
line 81, in mkdirs
E raise IOError(err)
E RuntimeError: OSError: [Errno 17] File exists:
'/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/tmp/tmpsbxy358s/bq_load/91debe7ffc444e4789363222a9eb638e/project1.dataset1.table1'
[while running
'BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']
```
When writing files for BigQuery loads, multiple workers or bundles
processing records for the same destination can run concurrently. They all
share the same file_prefix and construct the exact same destination directory
path.
When two workers concurrently execute:
```
if not fs.FileSystems.exists(directory):
fs.FileSystems.mkdirs(directory)
```
this could lead to a race condition when both workers see the directory
non-existent and try to create it. As a result, only one worker will be able to
create the directory successfully, while the other will raise a "File exists"
error.
In this PR, we fix it by ignoring if a directory exists in the concurrent
worker scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]