This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b6e254266f [HUDI-8943] Partition stats should skip queries with null
or complex expressions (#12744)
6b6e254266f is described below
commit 6b6e254266fa071e811023a788cf72883cc054c8
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jan 31 04:12:47 2025 +0530
[HUDI-8943] Partition stats should skip queries with null or complex
expressions (#12744)
* [HUDI-8943] Partition stats should skip queries with null or complex
expressions
---------
Co-authored-by: sivabalan <[email protected]>
---
.../scala/org/apache/hudi/HoodieFileIndex.scala | 4 +-
.../apache/hudi/PartitionStatsIndexSupport.scala | 89 +++++++++++-----------
.../TestPartitionStatsIndexWithSql.scala | 22 +++++-
3 files changed, 64 insertions(+), 51 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 930bd7519b1..3b5d7bd3c33 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -320,9 +320,8 @@ case class HoodieFileIndex(spark: SparkSession,
} else if (isPartitionedTable && isDataSkippingEnabled) {
// For partitioned table and no partition filters, if data skipping is
enabled,
// try using the PARTITION_STATS index to prune the partitions
- lazy val filterReferencedColumns = collectReferencedColumns(spark,
dataFilters, schema)
val prunedPartitionPaths = new PartitionStatsIndexSupport(spark,
schema, metadataConfig, metaClient)
- .prunePartitions(this, dataFilters, filterReferencedColumns)
+ .prunePartitions(this, dataFilters)
if (prunedPartitionPaths.nonEmpty) {
try {
(true, prunedPartitionPaths.get.map(e =>
convertToPartitionPath(e)).toSeq)
@@ -333,6 +332,7 @@ case class HoodieFileIndex(spark: SparkSession,
}
} else if (isExpressionIndexEnabled) {
val expressionIndexSupport = getExpressionIndexSupport
+ lazy val filterReferencedColumns = collectReferencedColumns(spark,
dataFilters, schema)
val exprIndexPrunedPartitionsOpt =
expressionIndexSupport.prunePartitions(this, dataFilters,
filterReferencedColumns)
if (exprIndexPrunedPartitionsOpt.nonEmpty) {
(true, exprIndexPrunedPartitionsOpt.get.map(e =>
convertToPartitionPath(e)).toSeq)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 90c5f1db397..9ae6c491f4c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -20,6 +20,7 @@
package org.apache.hudi
import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieFileIndex.collectReferencedColumns
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
columnStatsRecords
}
- def prunePartitions(fileIndex: HoodieFileIndex,
- queryFilters: Seq[Expression],
- queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
- if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
- if (containsAnySqlFunction(queryFilters)) {
- // If the query contains any SQL function, skip the pruning.
- // Expression Index will be used in such cases, if available.
- Option.empty
- } else {
+ def prunePartitions(fileIndex: HoodieFileIndex, queryFilters:
Seq[Expression]): Option[Set[String]] = {
+ if (isIndexAvailable && queryFilters.nonEmpty) {
+ // Filter out sql queries. Partition stats only supports simple queries
on field attribute without any operation on the field
+ val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+ val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ // Filter out queries involving null and value count checks
+ val filteredQueryFilters: Seq[Expression] =
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+ lazy val queryReferencedColumns = collectReferencedColumns(spark,
filteredQueryFilters, tableSchema)
+
+ if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
loadTransposed(queryReferencedColumns, readInMemory, Option.empty,
Option.empty) {
transposedPartitionStatsDF => {
- val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
- if (canLookupInPSI(queryFilters, indexedCols)) {
- try {
-
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
- val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
- if (allPartitions.nonEmpty) {
- // PARTITION_STATS index exist for all or some columns in
the filters
- // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
- // column in a filter does not have the stats
available, by making sure such a
- // filter does not prune any partition.
- // to be fixed. HUDI-8836.
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
- if (indexFilter.equals(TrueLiteral)) {
- // if there are any non indexed cols or we can't translate
source expr, we can prune partitions based on col stats lookup.
- Some(allPartitions)
- } else {
- Some(transposedPartitionStatsDF.where(new
Column(indexFilter))
-
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet)
- }
+ try {
+
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+ val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ if (allPartitions.nonEmpty) {
+ // PARTITION_STATS index exist for all or some columns in the
filters
+ // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
+ // column in a filter does not have the stats available,
by making sure such a
+ // filter does not prune any partition.
+ // to be fixed. HUDI-8836.
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
+ if (indexFilter.equals(TrueLiteral)) {
+ // if there are any non indexed cols or we can't translate
source expr, we cannot prune partitions based on col stats lookup.
+ Some(allPartitions)
} else {
- // PARTITION_STATS index does not exist for any column in
the filters, skip the pruning
- Option.empty
+ Some(transposedPartitionStatsDF.where(new
Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet)
}
- } finally {
- transposedPartitionStatsDF.unpersist()
+ } else {
+ // PARTITION_STATS index does not exist for any column in the
filters, skip the pruning
+ Option.empty
}
- } else { // for null filters, we can't look up in PSI.
- Option.empty
+ } finally {
+ transposedPartitionStatsDF.unpersist()
}
}
}
+ } else {
+ Option.empty
}
} else {
Option.empty
}
}
- private def canLookupInPSI(queryFilters: Seq[Expression], indexedCols:
Seq[String]): Boolean = {
- // If no queryFilter contains null/value-count filters, then we can do a
lookup
- !queryFilters.exists(containsNullOrValueCountBasedFilters(_, indexedCols))
+ private def filterExpressionsExcludingNullAndValue(queryFilters:
Seq[Expression], indexedCols: Seq[String]): Seq[Expression] = {
+ // Filter queries which do not contain null/value-count filters
+ queryFilters.filter(query => !containsNullOrValueCountBasedFilters(query,
indexedCols))
}
- private def containsAnySqlFunction(queryFilters: Seq[Expression]): Boolean =
{
+ private def filterNonSqlExpressions(queryFilters: Seq[Expression]):
Seq[Expression] = {
val isMatchingExpression = (expr: Expression) => {
expr.find {
case _: UnaryExpression => true
@@ -162,7 +161,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
case _ => false
}.isDefined
}
- queryFilters.exists(isMatchingExpression)
+ queryFilters.filter(query => !isMatchingExpression(query))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 504a74e7767..16b076fc8f4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -30,7 +30,7 @@ import
org.apache.hudi.metadata.{HoodieMetadataFileSystemView, MetadataPartition
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
Expression, GreaterThan, LessThan, Literal, Or}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
Expression, GreaterThan, IsNotNull, LessThan, Literal, Or}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
@@ -261,6 +261,14 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
GreaterThan(AttributeReference("rider", StringType)(),
Literal("rider-D")),
HoodieTableMetaClient.reload(metaClient),
isDataSkippingExpected = true)
+ // Include an isNotNull check
+ verifyFilePruningExpressions(
+ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ HoodieMetadataConfig.ENABLE.key -> "true"),
+ Seq(IsNotNull(AttributeReference("rider", StringType)()),
GreaterThan(AttributeReference("rider", StringType)(), Literal("rider-D"))),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = true)
// if we predicate on a col which is not indexed, we expect full scan.
verifyFilePruning(
Map(
@@ -625,12 +633,18 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
isDataSkippingExpected = false)
}
- private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean,
isNoScanExpected: Boolean = false): Unit = {
+ private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient,
+ isDataSkippingExpected: Boolean,
isNoScanExpected: Boolean = false): Unit = {
+ verifyFilePruningExpressions(opts, Seq(dataFilter), metaClient,
isDataSkippingExpected, isNoScanExpected)
+ }
+
+ private def verifyFilePruningExpressions(opts: Map[String, String],
dataFilters: Seq[Expression], metaClient: HoodieTableMetaClient,
+ isDataSkippingExpected: Boolean,
isNoScanExpected: Boolean = false): Unit = {
// with data skipping
val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
try {
- val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
dataFilters)
val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
val latestDataFilesCount = getLatestDataFilesCount(metaClient =
metaClient)
if (isDataSkippingExpected) {
@@ -646,7 +660,7 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
// with no data skipping
fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
- val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
dataFilters).flatMap(s => s.files).size
assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
} finally {
fileIndex.close()