This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8bcc89c2075 [HUDI-7047] Fix Stack Overflow Caused by partition pruning
(#10026)
8bcc89c2075 is described below
commit 8bcc89c2075c59ac12f67f83f47faf67f9b7208e
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Nov 9 08:34:48 2023 -0500
[HUDI-7047] Fix Stack Overflow Caused by partition pruning (#10026)
Co-authored-by: Jonathan Vexler <=>
---
.../main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index 982808fdc42..f68a0cac63b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -45,6 +45,8 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ hasPushedDownPartitionPredicates = true
+
val fileSlices =
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, dataFilters)
if (fileSlices.isEmpty) {
Seq.empty
@@ -55,13 +57,13 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
if (slice.getBaseFile.isPresent) {
slice.getBaseFile.get().getFileStatus
- } else if (slice.getLogFiles.findAny().isPresent) {
+ } else if (includeLogFiles &&
slice.getLogFiles.findAny().isPresent) {
slice.getLogFiles.findAny().get().getFileStatus
} else {
null
}
}).filter(slice => slice != null)
- val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+ val c = fileSlices.filter(f => (includeLogFiles &&
f.getLogFiles.findAny().isPresent)
|| (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId
-> f) }
if (c.nonEmpty) {
@@ -85,7 +87,6 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
}
}.toSeq
- hasPushedDownPartitionPredicates = true
if (shouldReadAsPartitionedTable()) {
prunedPartitionsAndFilteredFileSlices
} else if (shouldEmbedFileSlices) {