This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a9a96816fc6 feat: introduce pk filter to log file (#14205)
5a9a96816fc6 is described below

commit 5a9a96816fc6e833fd14a2df8ee2f03289c5395d
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Nov 18 10:07:05 2025 +0800

    feat: introduce pk filter to log file (#14205)
    
    1. introduce pk filter to log file
    
    Signed-off-by: TheR1sing3un <[email protected]>
---
 .../hudi/io/storage/HoodieSparkParquetReader.java  | 62 ++++++++++++++++++++--
 .../SparkFileFormatInternalRowReaderContext.scala  | 10 ++--
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  2 +
 .../apache/hudi/storage/StorageConfiguration.java  | 12 +++++
 .../sql/hudi/dml/insert/TestInsertTable.scala      | 13 +++--
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |  4 ++
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  4 ++
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |  4 ++
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |  5 ++
 9 files changed, 103 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index bb0bc82f4d84..5e34fda2147c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -39,22 +39,30 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.SchemaRepair;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
 import 
org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
 import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -63,6 +71,7 @@ import scala.Option$;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static 
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
 import static 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
 
 public class HoodieSparkParquetReader implements HoodieSparkFileReader {
 
@@ -121,6 +130,16 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
   }
 
   public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema 
requestedSchema) throws IOException {
+    return getUnsafeRowIterator(requestedSchema, Collections.emptyList());
+  }
+
+  /**
+   * Read parquet with requested schema and filters.
+   * WARN:
+   * Currently, the filter must only contain field references related to the 
primary key, as the primary key does not involve schema evolution.
+   * If it is necessary to expand to push down more fields in the future, 
please consider the issue of schema evolution carefully
+   */
+  public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema 
requestedSchema, List<Filter> readFilters) throws IOException {
     Schema nonNullSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
     StructType structSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
     Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
@@ -128,13 +147,44 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? 
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : 
getFileSchema());
     SparkBasicSchemaEvolution evolution = new 
SparkBasicSchemaEvolution(dataStructType, structSchema, 
SQLConf.get().sessionLocalTimeZone());
     String readSchemaJson = evolution.getRequestSchema().json();
+    SQLConf sqlConf = SQLConf.get();
     storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readSchemaJson);
     storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
readSchemaJson);
-    storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
-    storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
+    storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
sqlConf.getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
+    storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
sqlConf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
+    RebaseDateTime.RebaseSpec rebaseDateSpec = 
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED");
+    boolean parquetFilterPushDown = 
storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(), 
sqlConf.parquetRecordFilterEnabled());
+    if (parquetFilterPushDown && readFilters != null && 
!readFilters.isEmpty()) {
+      ParquetMetadata parquetMetadataWithoutRowGroup = 
getParquetMetadataWithoutRowGroup();
+      ParquetFilters parquetFilters = new ParquetFilters(
+          parquetMetadataWithoutRowGroup.getFileMetaData().getSchema(),
+          
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED().key(),
 sqlConf.parquetFilterPushDownDate()),
+          
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED().key(),
 sqlConf.parquetFilterPushDownTimestamp()),
+          
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED().key(),
 sqlConf.parquetFilterPushDownDecimal()),
+          
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED().key(),
+              
SparkAdapterSupport$.MODULE$.sparkAdapter().enableParquetFilterPushDownStringPredicate(sqlConf)),
+          
storage.getConf().getInt(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD().key(),
 sqlConf.parquetFilterPushDownInFilterThreshold()),
+          storage.getConf().getBoolean(SQLConf.CASE_SENSITIVE().key(), 
sqlConf.caseSensitiveAnalysis()),
+          rebaseDateSpec
+      );
+      Option<FilterPredicate> predicateOpt = 
Option.fromJavaOptional(readFilters
+          .stream()
+          .map(filter -> parquetFilters.createFilter(filter))
+          .filter(opt -> opt.isDefined())
+          .map(opt -> opt.get())
+          .reduce(FilterApi::and));
+      predicateOpt
+          .ifPresent(predicate -> {
+            // set the filter predicate, it will be used to filter row groups 
and records(may be)
+            
ParquetInputFormat.setFilterPredicate(storage.getConf().unwrapAs(Configuration.class),
 predicate);
+            // explicitly specify whether to filter records
+            storage.getConf().set(RECORD_FILTERING_ENABLED.toString(),
+                
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
 sqlConf.parquetRecordFilterEnabled())));
+          });
+    }
     ParquetReader<InternalRow> reader = ParquetReader.builder(new 
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
-            
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
-            
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), 
messageSchema),
+                rebaseDateSpec,
+                
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), 
messageSchema),
             new Path(path.toUri()))
         .withConf(storage.getConf().unwrapAs(Configuration.class))
         .build();
@@ -168,6 +218,10 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     return schemaOption.get();
   }
 
+  private ParquetMetadata getParquetMetadataWithoutRowGroup() {
+    return ((ParquetUtils) parquetUtils).readMetadata(storage, path);
+  }
+
   protected StructType getStructSchema() {
     if (structTypeOption.isEmpty()) {
       MessageType messageType = getFileSchema();
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 271f9c9aafc4..7005c2211cc7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -46,6 +46,8 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, 
StructType}
 import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 
+import scala.collection.JavaConverters._
+
 /**
  * Implementation of [[HoodieReaderContext]] to read [[InternalRow]]s with
  * [[ParquetFileFormat]] on Spark.
@@ -80,17 +82,17 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
     if (hasRowIndexField) {
       assert(getRecordContext.supportsParquetRowIndex())
     }
+    val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+    val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
     if (FSUtils.isLogFile(filePath)) {
-      // TODO: introduce pk filter in log file reader
+      // NOTE: now only primary key based filtering is supported for log files
       new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
-        
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
+        
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, 
readFilters.asJava).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
-      val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
       // partition value is empty because the spark parquet reader will append 
the partition columns to
       // each row if they are given. That is the only usage of the partition 
values in the reader.
       val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
         .createPartitionedFile(InternalRow.empty, filePath, start, length)
-      val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
 
       // Convert Avro dataSchema to Parquet MessageType for timestamp 
precision conversion
       val tableSchemaOpt = if (dataSchema != null) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index b803084831fc..7436ca046d6c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -245,6 +245,8 @@ trait SparkAdapter extends Serializable {
 
   def getRebaseSpec(policy: String): RebaseSpec
 
+  def enableParquetFilterPushDownStringPredicate(conf: SQLConf): Boolean
+
   /**
    * Gets the [[UTF8String]] factory implementation for the current Spark 
version.
    * [SPARK-46832] [[UTF8String]] doesn't support compareTo anymore since 
Spark 4.0
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
index 15f0333fd5b5..69a7b1befc05 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
@@ -126,6 +126,18 @@ public abstract class StorageConfiguration<T> implements 
Serializable {
     return value.isPresent() ? Long.parseLong(value.get()) : defaultValue;
   }
 
+  /**
+   * Gets the int value of a property key if present, or the default value if 
not.
+   *
+   * @param key          property key in String.
+   * @param defaultValue default value is the property does not exist.
+   * @return the property value if present, or the default value.
+   */
+  public final int getInt(String key, int defaultValue) {
+    Option<String> value = getString(key);
+    return value.isPresent() ? Integer.parseInt(value.get()) : defaultValue;
+  }
+
   /**
    * Gets the Enum value of a property key if present, or the default value if 
not.
    *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 6bff70c2dbc5..2c4cc00110da 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -844,20 +844,23 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         // Filter(id = 1) and Filter(name = 'a3') can be push down, 
Filter(price <> 10) can't be push down since it's not primary key
         var df = spark.sql(s"select price, ts, dt from $tableName where (id = 
1 or name = 'a3') and price <> 10")
         // only execute file scan physical plan
-        // expected in file scan only (id: 1), (id: 3) and (id: 4, from log 
file) matched, (id: 3) and (id: 4, from log file)  matched but will be filtered 
later
-        
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+        // expected in file scan only (id: 1), (id: 3) matched, (id: 3)  
matched but will be filtered later
+        // Filter(id = 1) and Filter(name = 'a3') will filter out (id: 2, 4) 
in base file,
+        // and filter out (id: 4) in log file
+        
assertResult(2)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
 
         checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")(
           Seq(11.0, 1000, "2023-12-06")
         )
         // Filter(id > 1) and Filter(name = 'a3') can be push down, 
Filter(price <> 10) can't be push down since it's not primary key
         df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")
-        // expected in file scan only (id: 1, from log file) (id: 2), (id: 3) 
and (id: 4, from log file) matched, (id: 1, from log file) (id: 3) and (id: 4)  
matched but will be filtered later
-        
assertResult(4)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+        // expected in file scan only (id: 2), (id: 3) and (id: 4, from log 
file) matched, (id: 3) and (id: 4)  matched but will be filtered later
+        // Filter(id > 1) and Filter(name = 'a3') will filter out (id: 0, 1) 
in base file,
+        // and filter out (id: 0) in log file
+        
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
       }
 
       withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "false") {
-        spark.sql(s"set ${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}=false")
         checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or 
name = 'a3') and price <> 10")(
           Seq(11.0, 2000, "2023-12-06")
         )
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 25594e2e41f5..fcaf296af640 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -171,4 +171,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
   override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
     RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
   }
+
+  override def enableParquetFilterPushDownStringPredicate(conf: SQLConf): 
Boolean = {
+    conf.parquetFilterPushDownStringStartWith
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index a652885fd75e..ead10fdf6603 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -170,4 +170,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
   override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
     RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
   }
+
+  override def enableParquetFilterPushDownStringPredicate(conf: SQLConf): 
Boolean = {
+    conf.parquetFilterPushDownStringPredicate
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index a140733c5c4a..c83ed1d87b1f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -186,4 +186,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
   override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
     RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
   }
+
+  override def enableParquetFilterPushDownStringPredicate(conf: SQLConf): 
Boolean = {
+    conf.parquetFilterPushDownStringPredicate
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index e22607d8a416..94fdc3df4ef5 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.{PartitionedFileUtil, 
QueryExecution, SQLE
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.HoodieFormatTrait
 import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
@@ -186,4 +187,8 @@ abstract class BaseSpark4Adapter extends SparkAdapter with 
Logging {
       applyFiltersToPlan(logicalRelation, requiredSchema, resolvedSchema,
         
relation.fileFormat.asInstanceOf[HoodieFormatTrait].getRequiredFilters))
   }
+
+  override def enableParquetFilterPushDownStringPredicate(conf: SQLConf): 
Boolean = {
+    conf.parquetFilterPushDownStringPredicate
+  }
 }

Reply via email to