[
https://issues.apache.org/jira/browse/HUDI-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HUDI-1330:
---------------------------------
Labels: pull-request-available (was: )
> handle prefix filtering at directory level
> ------------------------------------------
>
> Key: HUDI-1330
> URL: https://issues.apache.org/jira/browse/HUDI-1330
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer, Utilities
> Reporter: Vu Ho
> Priority: Major
> Labels: pull-request-available
>
> The current DFSPathSelector only ignore prefix(_, .) at the file level while
> files under intermediate directories are still being considered
> E.g. when reading from a Spark structure streaming source which very often
> consists of a .checkpoint directory, all metadata files should be ignored.
> This is not the case currently.
> {code:java}
> $SPARK_HOME/bin/spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
> ~/.m2/repository/org/apache/hudi/hudi-utilities-bundle_2.12/0.6.1-SNAPSHOT/hudi-utilities-bundle_2.12-0.6.1-SNAPSHOT.jar
> --target-base-path 'file:///tmp/hoodie/output/cow' --target-table hoodie_cow
> --table-type COPY_ON_WRITE --props 'dfs-source.properties' --source-class
> org.apache.hudi.utilities.sources.ParquetDFSSource --source-ordering-field
> ts --op UPSERT --continuous --min-sync-interval-seconds 30 {code}
> configs:
> {code:java}
> hoodie.upsert.shuffle.parallelism=2
> hoodie.insert.shuffle.parallelism=2
> hoodie.delete.shuffle.parallelism=2
> hoodie.bulkinsert.shuffle.parallelism=2
> hoodie.datasource.write.recordkey.field=id
> hoodie.datasource.write.partitionpath.field=dt
> # DFS Source
> hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie/input {code}
> Stacktrace:
> {code:java}
> Driver stacktrace:Driver stacktrace: at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at scala.Option.foreach(Option.scala:257) at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at
> org.apache.spark.rdd.RDD.collect(RDD.scala:989) at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:635)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194)
> at scala.Option.orElse(Option.scala:289) at
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230) at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:664) at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:663) at
> org.apache.hudi.utilities.sources.ParquetDFSSource.fromFiles(ParquetDFSSource.java:55)
> at
> org.apache.hudi.utilities.sources.ParquetDFSSource.lambda$fetchNextBatch$0(ParquetDFSSource.java:50)
> at org.apache.hudi.common.util.Option.map(Option.java:104) at
> org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:50)
> at
> org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
> at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75) at
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:335)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:236)
> at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
> ... 4 moreCaused by: org.apache.spark.SparkException: Exception thrown in
> awaitResult: at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) at
> org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290) at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:613)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:605)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:123) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) ... 3
> moreCaused by: java.io.IOException: Could not read footer for file:
> FileStatus{path=file:/tmp/hoodie/input/.checkpoint/commits/0;
> isDirectory=false; length=29; replication=0; blocksize=0;
> modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-;
> isSymlink=false} at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
> at
> org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
> by: java.lang.RuntimeException: file:/tmp/hoodie/input/.checkpoint/commits/0
> is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
> found [34, 58, 48, 125] at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)