cozos opened a new issue, #24303:
URL: https://github.com/apache/beam/issues/24303
### What happened?
I am running Apache Beam on Spark. My code:
```py
from apache_beam import Create, Map, Pipeline
from apache_beam.io.parquetio import WriteToParquet
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(
[
"--runner", "SparkRunner",
"--spark_version", "3",
]
)
with Pipeline(options=options) as pipeline:
(
pipeline
| Create(list(range(1000)))
| Map(lambda x: {"name": str(x), "age": x})
| WriteToParquet(
"azfs://mystorageaccount/mycontainer/mypath/",
pyarrow.schema([("name", pyarrow.binary()), ("age",
pyarrow.int64())]),
file_name_suffix=".parquet",
)
)
```
The error:
```
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1198, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 718, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 841, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1334, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/iobase.py",
line 1229, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
line 279, in finalize_write
self._check_state_for_finalize_write(writer_results, num_shards))
File
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
line 263, in _check_state_for_finalize_write
FileSystems.checksum(src) == FileSystems.checksum(dst)):
File
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filesystems.py",
line 324, in checksum
return filesystem.checksum(path)
File
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/azure/blobstoragefilesystem.py",
line 277, in checksum
raise BeamIOError("Checksum operation failed", {path, e})
apache_beam.io.filesystem.BeamIOError: Checksum operation failed with
exceptions
{'azfs://mystorageaccount/mycontainer/mypath/beam-temp--9a346cf260e611ed84fc00163eafa21b/59821b95-11a8-43fd-9117-f9b8466f46fd..parquet',
BlobStorageError('The specified blob does not exist.', 404)}
```
This error is only happening when I have speculative execution enabled. When
I disable speculative execution, the problem doesn't happen. It seems to me
like `finalize_write` is not idempotent here, and is deleting my temp files.
### Issue Priority
Priority: 2
### Issue Component
Component: io-py-parquet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]