Rap70r commented on issue #2586:
URL: https://github.com/apache/hudi/issues/2586#issuecomment-782892789


   Hello nsivabalan, thank you for getting back to me.
   
   * All Hudi tables are stored in S3 buckets. We use Spark Structured 
Streaming to apply incremental updates against S3 Hudi datasets.
   
   * **Stacktrace**
   
   ```
   org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to 
download file path: 
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
 range: 0-515243, partition values: [empty row], isDataPresent: false
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: No such file or directory 
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
        at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
        at 
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   ```
   
   * Writer Configs:
   
   ```
   Output Path: S3 path
   hoodie.datasource.write.operation: upsert
   parallelism: 3000
   hoodie.datasource.write.table.type: COPY_ON_WRITE
   hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
   File Version Retained: 1
   hoodie.datasource.hive_sync.enable: false
   SaveMode: Append
   partitionBy: Single Column
   ```
   
   * Reader Configs:
   
   ```
   val df = ss.read
        .format("org.apache.hudi")
        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
        .load("s3://path/to/hudi/table/*/*")
         
   df.createOrReplaceTempView("hudi_table")
   ```
   
   * At any point of time, this setup has just a single writer.
   
   **To Reproduce**
   
   * Load the dataframe using Hudi:
   ```
   val df = ss.read
        .format("org.apache.hudi")
        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
        .load("s3://path/to/hudi/table/*/*")
         
   df.createOrReplaceTempView("hudi_table")
   ```
   
   * Apply time consuming Spark SQL queries against 'hudi_table'
   
   * A different Spark process updates Hudi dataset incrementally.
   
   * After upsert is done, if the time consuming query is still running, it 
will crash with below error:
   
   ```
   org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to 
download file path: 
s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet,
 range: 0-515243, partition values: [empty row], isDataPresent: false
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: No such file or directory 
's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
        at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
        at 
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
        at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   ```
   
   Let me know if you need further details.
   
   Thank you
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to