[ 
https://issues.apache.org/jira/browse/HUDI-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-1330:
---------------------------------
    Labels: pull-request-available  (was: )

> handle prefix filtering at directory level
> ------------------------------------------
>
>                 Key: HUDI-1330
>                 URL: https://issues.apache.org/jira/browse/HUDI-1330
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Utilities
>            Reporter: Vu Ho
>            Priority: Major
>              Labels: pull-request-available
>
> The current DFSPathSelector only ignore prefix(_, .) at the file level while 
> files under intermediate directories are still being considered
>  E.g. when reading from a Spark structure streaming source which very often 
> consists of a .checkpoint directory, all metadata files should be ignored. 
> This is not the case currently.
> {code:java}
> $SPARK_HOME/bin/spark-submit --class 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
> ~/.m2/repository/org/apache/hudi/hudi-utilities-bundle_2.12/0.6.1-SNAPSHOT/hudi-utilities-bundle_2.12-0.6.1-SNAPSHOT.jar
>  --target-base-path 'file:///tmp/hoodie/output/cow' --target-table hoodie_cow 
> --table-type COPY_ON_WRITE --props 'dfs-source.properties' --source-class 
> org.apache.hudi.utilities.sources.ParquetDFSSource  --source-ordering-field 
> ts --op UPSERT --continuous --min-sync-interval-seconds 30 {code}
>  configs:
> {code:java}
> hoodie.upsert.shuffle.parallelism=2
> hoodie.insert.shuffle.parallelism=2
> hoodie.delete.shuffle.parallelism=2
> hoodie.bulkinsert.shuffle.parallelism=2
> hoodie.datasource.write.recordkey.field=id
> hoodie.datasource.write.partitionpath.field=dt
> # DFS Source
> hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie/input {code}
> Stacktrace: 
> {code:java}
> Driver stacktrace:Driver stacktrace: at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
>  at scala.Option.foreach(Option.scala:257) at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at 
> org.apache.spark.rdd.RDD.collect(RDD.scala:989) at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:635)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
>  at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242) 
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:664) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:663) at 
> org.apache.hudi.utilities.sources.ParquetDFSSource.fromFiles(ParquetDFSSource.java:55)
>  at 
> org.apache.hudi.utilities.sources.ParquetDFSSource.lambda$fetchNextBatch$0(ParquetDFSSource.java:50)
>  at org.apache.hudi.common.util.Option.map(Option.java:104) at 
> org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:50)
>  at 
> org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) 
> at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75) at 
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:335)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:236)
>  at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
>  ... 4 moreCaused by: org.apache.spark.SparkException: Exception thrown in 
> awaitResult:  at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) at 
> org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290) at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:613)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:605)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:123) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) ... 3 
> moreCaused by: java.io.IOException: Could not read footer for file: 
> FileStatus{path=file:/tmp/hoodie/input/.checkpoint/commits/0; 
> isDirectory=false; length=29; replication=0; blocksize=0; 
> modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; 
> isSymlink=false} at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
>  at 
> org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)
>  at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>  at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
> at 
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
>  by: java.lang.RuntimeException: file:/tmp/hoodie/input/.checkpoint/commits/0 
> is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but 
> found [34, 58, 48, 125] at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
>  at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
>  at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
>  at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to