This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4aed85f1b09 [HUDI-7053] Fix the filter pushdown logic for file group
reader (#10018)
4aed85f1b09 is described below
commit 4aed85f1b0986d99ac7b93d4be9cbf6e6377ae91
Author: Lin Liu <[email protected]>
AuthorDate: Thu Nov 9 10:16:17 2023 -0800
[HUDI-7053] Fix the filter pushdown logic for file group reader (#10018)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala | 10 ++++++++--
.../datasources/parquet/NewHoodieParquetFileFormat.scala | 7 ++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 4073d064dd5..01e08d7cb0f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -295,14 +295,16 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
PartitionedFile => Iterator[InternalRow],
PartitionedFile => Iterator[InternalRow]) = {
+ val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters,
tableState.recordKeyField)
val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
//file reader for reading a hudi base file that needs to be merged with
log files
val preMergeBaseFileReader = if (isMOR) {
// Add support for reading files using inline file system.
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema,
- requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchemaWithMandatory,
+ if (shouldUseRecordPosition) requiredFilters else
recordKeyRelatedFilters ++ requiredFilters,
+ options, new Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
@@ -353,6 +355,10 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
(baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader)
}
+ protected def getRecordKeyRelatedFilters(filters: Seq[Filter],
recordKeyColumn: String): Seq[Filter] = {
+ filters.filter(f => f.references.exists(c =>
c.equalsIgnoreCase(recordKeyColumn)))
+ }
+
protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 4fe921bc440..e77e353c66c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -201,9 +201,10 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
}
//file reader for reading a hudi base file that needs to be merged with
log files
+ val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters,
tableState.value.recordKeyField)
val preMergeBaseFileReader = if (isMOR) {
super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
- requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
+ requiredSchemaWithMandatory, requiredFilters ++
recordKeyRelatedFilters, options, new Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
@@ -371,4 +372,8 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
}
+
+ protected def getRecordKeyRelatedFilters(filters: Seq[Filter],
recordKeyColumn: String): Seq[Filter] = {
+ filters.filter(f => f.references.exists(c =>
c.equalsIgnoreCase(recordKeyColumn)))
+ }
}