This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aed85f1b09 [HUDI-7053] Fix the filter pushdown logic for file group 
reader (#10018)
4aed85f1b09 is described below

commit 4aed85f1b0986d99ac7b93d4be9cbf6e6377ae91
Author: Lin Liu <[email protected]>
AuthorDate: Thu Nov 9 10:16:17 2023 -0800

    [HUDI-7053] Fix the filter pushdown logic for file group reader (#10018)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala  | 10 ++++++++--
 .../datasources/parquet/NewHoodieParquetFileFormat.scala       |  7 ++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 4073d064dd5..01e08d7cb0f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -295,14 +295,16 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     PartitionedFile => Iterator[InternalRow],
     PartitionedFile => Iterator[InternalRow]) = {
 
+    val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, 
tableState.recordKeyField)
     val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
       filters ++ requiredFilters, options, new Configuration(hadoopConf))
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
     val preMergeBaseFileReader = if (isMOR) {
       // Add support for reading files using inline file system.
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema,
-        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchemaWithMandatory,
+        if (shouldUseRecordPosition) requiredFilters else 
recordKeyRelatedFilters ++ requiredFilters,
+        options, new Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
@@ -353,6 +355,10 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader)
   }
 
+  protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
+    filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
+  }
+
   protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
     
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 4fe921bc440..e77e353c66c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -201,9 +201,10 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     }
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
+   val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, 
tableState.value.recordKeyField)
     val preMergeBaseFileReader = if (isMOR) {
       super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
-        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
+        requiredSchemaWithMandatory, requiredFilters ++ 
recordKeyRelatedFilters, options, new Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
@@ -371,4 +372,8 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
   protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
     
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
   }
+
+  protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
+    filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
+  }
 }

Reply via email to