Rap70r edited a comment on issue #2586:
URL: https://github.com/apache/hudi/issues/2586#issuecomment-782892789
Hello nsivabalan, thank you for getting back to me.
* All Hudi tables are stored in S3 buckets. We use Spark Structured
Streaming to apply incremental updates against S3 Hudi datasets.
* **Stacktrace**
```
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to
download file path:
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
range: 0-515243, partition values: [empty row], isDataPresent: false
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
```
* Writer Configs:
```
Output Path: S3 path
hoodie.datasource.write.operation: upsert
parallelism: 3000
hoodie.datasource.write.table.type: COPY_ON_WRITE
hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
File Version Retained: 1
hoodie.datasource.hive_sync.enable: false
SaveMode: Append
partitionBy: Single Column
```
* Reader Configs:
```
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
```
* At any point of time, this setup has just a single writer. However, the
writer applies incremental upserts very frequently, but the consecutive upsert
jobs do not overlap. This means that while a reader might be performing time
consuming queries on a Hudi dataset Spark dataframe, the writer has time to
finish multiple times.
**To Reproduce**
* Load the dataframe using Hudi:
```
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
```
* Apply time consuming Spark SQL queries against 'hudi_table'
* A different Spark process updates Hudi dataset incrementally.
* After upsert is done, if the time consuming query is still running, it
will crash with below error:
```
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to
download file path:
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
range: 0-515243, partition values: [empty row], isDataPresent: false
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
at
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
```
Let me know if you need further details.
Thank you
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]