This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5a9a96816fc6 feat: introduce pk filter to log file (#14205)
5a9a96816fc6 is described below
commit 5a9a96816fc6e833fd14a2df8ee2f03289c5395d
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Nov 18 10:07:05 2025 +0800
feat: introduce pk filter to log file (#14205)
1. introduce pk filter to log file
Signed-off-by: TheR1sing3un <[email protected]>
---
.../hudi/io/storage/HoodieSparkParquetReader.java | 62 ++++++++++++++++++++--
.../SparkFileFormatInternalRowReaderContext.scala | 10 ++--
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 2 +
.../apache/hudi/storage/StorageConfiguration.java | 12 +++++
.../sql/hudi/dml/insert/TestInsertTable.scala | 13 +++--
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 4 ++
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 4 ++
.../apache/spark/sql/adapter/Spark3_5Adapter.scala | 4 ++
.../spark/sql/adapter/BaseSpark4Adapter.scala | 5 ++
9 files changed, 103 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index bb0bc82f4d84..5e34fda2147c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -39,22 +39,30 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.SchemaRepair;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters;
import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
import
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import
org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -63,6 +71,7 @@ import scala.Option$;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
@@ -121,6 +130,16 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
}
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema
requestedSchema) throws IOException {
+ return getUnsafeRowIterator(requestedSchema, Collections.emptyList());
+ }
+
+ /**
+ * Read parquet with requested schema and filters.
+ * WARN:
+ * Currently, the filter must only contain field references related to the
primary key, as the primary key does not involve schema evolution.
+ * If it is necessary to expand to push down more fields in the future,
please consider the issue of schema evolution carefully
+ */
+ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema
requestedSchema, List<Filter> readFilters) throws IOException {
Schema nonNullSchema =
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
StructType structSchema =
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
@@ -128,13 +147,44 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ?
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) :
getFileSchema());
SparkBasicSchemaEvolution evolution = new
SparkBasicSchemaEvolution(dataStructType, structSchema,
SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
+ SQLConf sqlConf = SQLConf.get();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readSchemaJson);
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
readSchemaJson);
- storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
- storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
+ storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(),
sqlConf.getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
+ storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
sqlConf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
+ RebaseDateTime.RebaseSpec rebaseDateSpec =
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED");
+ boolean parquetFilterPushDown =
storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
sqlConf.parquetRecordFilterEnabled());
+ if (parquetFilterPushDown && readFilters != null &&
!readFilters.isEmpty()) {
+ ParquetMetadata parquetMetadataWithoutRowGroup =
getParquetMetadataWithoutRowGroup();
+ ParquetFilters parquetFilters = new ParquetFilters(
+ parquetMetadataWithoutRowGroup.getFileMetaData().getSchema(),
+
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED().key(),
sqlConf.parquetFilterPushDownDate()),
+
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED().key(),
sqlConf.parquetFilterPushDownTimestamp()),
+
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED().key(),
sqlConf.parquetFilterPushDownDecimal()),
+
storage.getConf().getBoolean(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED().key(),
+
SparkAdapterSupport$.MODULE$.sparkAdapter().enableParquetFilterPushDownStringPredicate(sqlConf)),
+
storage.getConf().getInt(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD().key(),
sqlConf.parquetFilterPushDownInFilterThreshold()),
+ storage.getConf().getBoolean(SQLConf.CASE_SENSITIVE().key(),
sqlConf.caseSensitiveAnalysis()),
+ rebaseDateSpec
+ );
+ Option<FilterPredicate> predicateOpt =
Option.fromJavaOptional(readFilters
+ .stream()
+ .map(filter -> parquetFilters.createFilter(filter))
+ .filter(opt -> opt.isDefined())
+ .map(opt -> opt.get())
+ .reduce(FilterApi::and));
+ predicateOpt
+ .ifPresent(predicate -> {
+ // set the filter predicate, it will be used to filter row groups
and records(may be)
+
ParquetInputFormat.setFilterPredicate(storage.getConf().unwrapAs(Configuration.class),
predicate);
+ // explicitly specify whether to filter records
+ storage.getConf().set(RECORD_FILTERING_ENABLED.toString(),
+
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
sqlConf.parquetRecordFilterEnabled())));
+ });
+ }
ParquetReader<InternalRow> reader = ParquetReader.builder(new
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
-
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
-
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),
messageSchema),
+ rebaseDateSpec,
+
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),
messageSchema),
new Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
@@ -168,6 +218,10 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
return schemaOption.get();
}
+ private ParquetMetadata getParquetMetadataWithoutRowGroup() {
+ return ((ParquetUtils) parquetUtils).readMetadata(storage, path);
+ }
+
protected StructType getStructSchema() {
if (structTypeOption.isEmpty()) {
MessageType messageType = getFileSchema();
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 271f9c9aafc4..7005c2211cc7 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -46,6 +46,8 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField,
StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import scala.collection.JavaConverters._
+
/**
* Implementation of [[HoodieReaderContext]] to read [[InternalRow]]s with
* [[ParquetFileFormat]] on Spark.
@@ -80,17 +82,17 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
+ val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+ val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
- // TODO: introduce pk filter in log file reader
+ // NOTE: now only primary key based filtering is supported for log files
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
-
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
+
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema,
readFilters.asJava).asInstanceOf[ClosableIterator[InternalRow]]
} else {
- val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
// partition value is empty because the spark parquet reader will append
the partition columns to
// each row if they are given. That is the only usage of the partition
values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
- val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
// Convert Avro dataSchema to Parquet MessageType for timestamp
precision conversion
val tableSchemaOpt = if (dataSchema != null) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index b803084831fc..7436ca046d6c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -245,6 +245,8 @@ trait SparkAdapter extends Serializable {
def getRebaseSpec(policy: String): RebaseSpec
+ def enableParquetFilterPushDownStringPredicate(conf: SQLConf): Boolean
+
/**
* Gets the [[UTF8String]] factory implementation for the current Spark
version.
* [SPARK-46832] [[UTF8String]] doesn't support compareTo anymore since
Spark 4.0
diff --git
a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
index 15f0333fd5b5..69a7b1befc05 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
@@ -126,6 +126,18 @@ public abstract class StorageConfiguration<T> implements
Serializable {
return value.isPresent() ? Long.parseLong(value.get()) : defaultValue;
}
+ /**
+ * Gets the int value of a property key if present, or the default value if
not.
+ *
+ * @param key property key in String.
+ * @param defaultValue default value is the property does not exist.
+ * @return the property value if present, or the default value.
+ */
+ public final int getInt(String key, int defaultValue) {
+ Option<String> value = getString(key);
+ return value.isPresent() ? Integer.parseInt(value.get()) : defaultValue;
+ }
+
/**
* Gets the Enum value of a property key if present, or the default value if
not.
*
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 6bff70c2dbc5..2c4cc00110da 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -844,20 +844,23 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
// Filter(id = 1) and Filter(name = 'a3') can be push down,
Filter(price <> 10) can't be push down since it's not primary key
var df = spark.sql(s"select price, ts, dt from $tableName where (id =
1 or name = 'a3') and price <> 10")
// only execute file scan physical plan
- // expected in file scan only (id: 1), (id: 3) and (id: 4, from log
file) matched, (id: 3) and (id: 4, from log file) matched but will be filtered
later
-
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+ // expected in file scan only (id: 1), (id: 3) matched, (id: 3)
matched but will be filtered later
+ // Filter(id = 1) and Filter(name = 'a3') will filter out (id: 2, 4)
in base file,
+ // and filter out (id: 4) in log file
+
assertResult(2)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")(
Seq(11.0, 1000, "2023-12-06")
)
// Filter(id > 1) and Filter(name = 'a3') can be push down,
Filter(price <> 10) can't be push down since it's not primary key
df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")
- // expected in file scan only (id: 1, from log file) (id: 2), (id: 3)
and (id: 4, from log file) matched, (id: 1, from log file) (id: 3) and (id: 4)
matched but will be filtered later
-
assertResult(4)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+ // expected in file scan only (id: 2), (id: 3) and (id: 4, from log
file) matched, (id: 3) and (id: 4) matched but will be filtered later
+ // Filter(id > 1) and Filter(name = 'a3') will filter out (id: 0, 1)
in base file,
+ // and filter out (id: 0) in log file
+
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
}
withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "false") {
- spark.sql(s"set ${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}=false")
checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or
name = 'a3') and price <> 10")(
Seq(11.0, 2000, "2023-12-06")
)
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 25594e2e41f5..fcaf296af640 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -171,4 +171,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
}
+
+ override def enableParquetFilterPushDownStringPredicate(conf: SQLConf):
Boolean = {
+ conf.parquetFilterPushDownStringStartWith
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index a652885fd75e..ead10fdf6603 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -170,4 +170,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
}
+
+ override def enableParquetFilterPushDownStringPredicate(conf: SQLConf):
Boolean = {
+ conf.parquetFilterPushDownStringPredicate
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index a140733c5c4a..c83ed1d87b1f 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -186,4 +186,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
}
+
+ override def enableParquetFilterPushDownStringPredicate(conf: SQLConf):
Boolean = {
+ conf.parquetFilterPushDownStringPredicate
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index e22607d8a416..94fdc3df4ef5 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.{PartitionedFileUtil,
QueryExecution, SQLE
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.HoodieFormatTrait
import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
@@ -186,4 +187,8 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
applyFiltersToPlan(logicalRelation, requiredSchema, resolvedSchema,
relation.fileFormat.asInstanceOf[HoodieFormatTrait].getRequiredFilters))
}
+
+ override def enableParquetFilterPushDownStringPredicate(conf: SQLConf):
Boolean = {
+ conf.parquetFilterPushDownStringPredicate
+ }
}