cozos opened a new issue, #24303:
URL: https://github.com/apache/beam/issues/24303

   ### What happened?
   
   I am running Apache Beam on Spark. My code:
   ```py
   from apache_beam import Create, Map, Pipeline
   from apache_beam.io.parquetio import  WriteToParquet
   from apache_beam.options.pipeline_options import PipelineOptions
   
   options = PipelineOptions(
       [
           "--runner",  "SparkRunner",
           "--spark_version", "3",
       ]
   )
   
   with Pipeline(options=options) as pipeline:
       (
           pipeline
           | Create(list(range(1000)))
           | Map(lambda x: {"name": str(x), "age": x})
           | WriteToParquet(
               "azfs://mystorageaccount/mycontainer/mypath/",
               pyarrow.schema([("name", pyarrow.binary()), ("age", 
pyarrow.int64())]),
               file_name_suffix=".parquet",
           )
       )
   ```
   
   The error:
   ```
   Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1198, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 718, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 841, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "apache_beam/runners/common.py", line 1334, in 
apache_beam.runners.common._OutputProcessor.process_outputs
     File 
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/iobase.py", 
line 1229, in <genexpr>
       window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
     File 
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
 line 279, in finalize_write
       self._check_state_for_finalize_write(writer_results, num_shards))
     File 
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
 line 263, in _check_state_for_finalize_write
       FileSystems.checksum(src) == FileSystems.checksum(dst)):
     File 
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/filesystems.py",
 line 324, in checksum
       return filesystem.checksum(path)
     File 
"/databricks/python3/lib/python3.7/site-packages/apache_beam/io/azure/blobstoragefilesystem.py",
 line 277, in checksum
       raise BeamIOError("Checksum operation failed", {path, e})
   apache_beam.io.filesystem.BeamIOError: Checksum operation failed with 
exceptions 
{'azfs://mystorageaccount/mycontainer/mypath/beam-temp--9a346cf260e611ed84fc00163eafa21b/59821b95-11a8-43fd-9117-f9b8466f46fd..parquet',
 BlobStorageError('The specified blob does not exist.', 404)}
   ```
   
   This error is only happening when I have speculative execution enabled. When 
I disable speculative execution, the problem doesn't happen. It seems to me 
like `finalize_write` is not idempotent here, and is deleting my temp files.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-py-parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to