Rap70r commented on issue #2586:
URL: https://github.com/apache/hudi/issues/2586#issuecomment-782892789
Hello nsivabalan, thank you for getting back to me.
* All Hudi tables are stored in S3 buckets. We use Spark Structured
Streaming to apply incremental updates against S3 Hudi datasets.
* **Stacktrace**
```
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to
download file path:
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
range: 0-515243, partition values: [empty row], isDataPresent: false
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
```
* Writer Configs:
```
Output Path: S3 path
hoodie.datasource.write.operation: upsert
parallelism: 3000
hoodie.datasource.write.table.type: COPY_ON_WRITE
hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
File Version Retained: 1
hoodie.datasource.hive_sync.enable: false
SaveMode: Append
partitionBy: Single Column
```
* Reader Configs:
```
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
```
* At any point of time, this setup has just a single writer.
**To Reproduce**
* Load the dataframe using Hudi:
```
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
```
* Apply time consuming Spark SQL queries against 'hudi_table'
* A different Spark process updates Hudi dataset incrementally.
* After upsert is done, if the time consuming query is still running, it
will crash with below error:
```
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to
download file path:
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
range: 0-515243, partition values: [empty row], isDataPresent: false
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
```
Let me know if you need further details.
Thank you
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]