This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6e254266f [HUDI-8943] Partition stats should skip queries with null 
or complex expressions (#12744)
6b6e254266f is described below

commit 6b6e254266fa071e811023a788cf72883cc054c8
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jan 31 04:12:47 2025 +0530

    [HUDI-8943] Partition stats should skip queries with null or complex 
expressions (#12744)
    
    * [HUDI-8943] Partition stats should skip queries with null or complex 
expressions
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  4 +-
 .../apache/hudi/PartitionStatsIndexSupport.scala   | 89 +++++++++++-----------
 .../TestPartitionStatsIndexWithSql.scala           | 22 +++++-
 3 files changed, 64 insertions(+), 51 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 930bd7519b1..3b5d7bd3c33 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -320,9 +320,8 @@ case class HoodieFileIndex(spark: SparkSession,
       } else if (isPartitionedTable && isDataSkippingEnabled) {
         // For partitioned table and no partition filters, if data skipping is 
enabled,
         // try using the PARTITION_STATS index to prune the partitions
-        lazy val filterReferencedColumns = collectReferencedColumns(spark, 
dataFilters, schema)
         val prunedPartitionPaths = new PartitionStatsIndexSupport(spark, 
schema, metadataConfig, metaClient)
-          .prunePartitions(this, dataFilters, filterReferencedColumns)
+          .prunePartitions(this, dataFilters)
         if (prunedPartitionPaths.nonEmpty) {
           try {
             (true, prunedPartitionPaths.get.map(e => 
convertToPartitionPath(e)).toSeq)
@@ -333,6 +332,7 @@ case class HoodieFileIndex(spark: SparkSession,
           }
         } else if (isExpressionIndexEnabled) {
           val expressionIndexSupport = getExpressionIndexSupport
+          lazy val filterReferencedColumns = collectReferencedColumns(spark, 
dataFilters, schema)
           val exprIndexPrunedPartitionsOpt = 
expressionIndexSupport.prunePartitions(this, dataFilters, 
filterReferencedColumns)
           if (exprIndexPrunedPartitionsOpt.nonEmpty) {
             (true, exprIndexPrunedPartitionsOpt.get.map(e => 
convertToPartitionPath(e)).toSeq)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 90c5f1db397..9ae6c491f4c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -20,6 +20,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieFileIndex.collectReferencedColumns
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
     columnStatsRecords
   }
 
-  def prunePartitions(fileIndex: HoodieFileIndex,
-                      queryFilters: Seq[Expression],
-                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
-    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
-      if (containsAnySqlFunction(queryFilters)) {
-        // If the query contains any SQL function, skip the pruning.
-        // Expression Index will be used in such cases, if available.
-        Option.empty
-      } else {
+  def prunePartitions(fileIndex: HoodieFileIndex, queryFilters: 
Seq[Expression]): Option[Set[String]] = {
+    if (isIndexAvailable && queryFilters.nonEmpty) {
+      // Filter out sql queries. Partition stats only supports simple queries 
on field attribute without any operation on the field
+      val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+      val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+      // Filter out queries involving null and value count checks
+      val filteredQueryFilters: Seq[Expression] = 
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+      lazy val queryReferencedColumns = collectReferencedColumns(spark, 
filteredQueryFilters, tableSchema)
+
+      if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
         val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
         loadTransposed(queryReferencedColumns, readInMemory, Option.empty, 
Option.empty) {
           transposedPartitionStatsDF => {
-            val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
-            if (canLookupInPSI(queryFilters, indexedCols)) {
-              try {
-                
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
-                val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                  .collect()
-                  .map(_.getString(0))
-                  .toSet
-                if (allPartitions.nonEmpty) {
-                  // PARTITION_STATS index exist for all or some columns in 
the filters
-                  // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
-                  //       column in a filter does not have the stats 
available, by making sure such a
-                  //       filter does not prune any partition.
-                  // to be fixed. HUDI-8836.
-                  val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
-                  if (indexFilter.equals(TrueLiteral)) {
-                    // if there are any non indexed cols or we can't translate 
source expr, we can prune partitions based on col stats lookup.
-                    Some(allPartitions)
-                  } else {
-                    Some(transposedPartitionStatsDF.where(new 
Column(indexFilter))
-                      
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                      .collect()
-                      .map(_.getString(0))
-                      .toSet)
-                  }
+            try {
+              
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+              val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+                .collect()
+                .map(_.getString(0))
+                .toSet
+              if (allPartitions.nonEmpty) {
+                // PARTITION_STATS index exist for all or some columns in the 
filters
+                // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
+                //       column in a filter does not have the stats available, 
by making sure such a
+                //       filter does not prune any partition.
+                // to be fixed. HUDI-8836.
+                val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
+                if (indexFilter.equals(TrueLiteral)) {
+                  // if there are any non indexed cols or we can't translate 
source expr, we cannot prune partitions based on col stats lookup.
+                  Some(allPartitions)
                 } else {
-                  // PARTITION_STATS index does not exist for any column in 
the filters, skip the pruning
-                  Option.empty
+                  Some(transposedPartitionStatsDF.where(new 
Column(indexFilter))
+                    .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+                    .collect()
+                    .map(_.getString(0))
+                    .toSet)
                 }
-              } finally {
-                transposedPartitionStatsDF.unpersist()
+              } else {
+                // PARTITION_STATS index does not exist for any column in the 
filters, skip the pruning
+                Option.empty
               }
-            } else { // for null filters, we can't look up in PSI.
-              Option.empty
+            } finally {
+              transposedPartitionStatsDF.unpersist()
             }
           }
         }
+      } else {
+        Option.empty
       }
     } else {
       Option.empty
     }
   }
 
-  private def canLookupInPSI(queryFilters: Seq[Expression], indexedCols: 
Seq[String]): Boolean = {
-    // If no queryFilter contains null/value-count filters, then we can do a 
lookup
-    !queryFilters.exists(containsNullOrValueCountBasedFilters(_, indexedCols))
+  private def filterExpressionsExcludingNullAndValue(queryFilters: 
Seq[Expression], indexedCols: Seq[String]): Seq[Expression] = {
+    // Filter queries which do not contain null/value-count filters
+    queryFilters.filter(query => !containsNullOrValueCountBasedFilters(query, 
indexedCols))
   }
 
-  private def containsAnySqlFunction(queryFilters: Seq[Expression]): Boolean = 
{
+  private def filterNonSqlExpressions(queryFilters: Seq[Expression]): 
Seq[Expression] = {
     val isMatchingExpression = (expr: Expression) => {
       expr.find {
         case _: UnaryExpression => true
@@ -162,7 +161,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
         case _ => false
       }.isDefined
     }
-    queryFilters.exists(isMatchingExpression)
+    queryFilters.filter(query => !isMatchingExpression(query))
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 504a74e7767..16b076fc8f4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -30,7 +30,7 @@ import 
org.apache.hudi.metadata.{HoodieMetadataFileSystemView, MetadataPartition
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
Expression, GreaterThan, LessThan, Literal, Or}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
Expression, GreaterThan, IsNotNull, LessThan, Literal, Or}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
@@ -261,6 +261,14 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
           GreaterThan(AttributeReference("rider", StringType)(), 
Literal("rider-D")),
           HoodieTableMetaClient.reload(metaClient),
           isDataSkippingExpected = true)
+        // Include an isNotNull check
+        verifyFilePruningExpressions(
+          Map(
+            DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+            HoodieMetadataConfig.ENABLE.key -> "true"),
+          Seq(IsNotNull(AttributeReference("rider", StringType)()), 
GreaterThan(AttributeReference("rider", StringType)(), Literal("rider-D"))),
+          HoodieTableMetaClient.reload(metaClient),
+          isDataSkippingExpected = true)
         // if we predicate on a col which is not indexed, we expect full scan.
         verifyFilePruning(
           Map(
@@ -625,12 +633,18 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
       isDataSkippingExpected = false)
   }
 
-  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean, 
isNoScanExpected: Boolean = false): Unit = {
+  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient,
+                                isDataSkippingExpected: Boolean, 
isNoScanExpected: Boolean = false): Unit = {
+    verifyFilePruningExpressions(opts, Seq(dataFilter), metaClient, 
isDataSkippingExpected, isNoScanExpected)
+  }
+
+  private def verifyFilePruningExpressions(opts: Map[String, String], 
dataFilters: Seq[Expression], metaClient: HoodieTableMetaClient,
+                                           isDataSkippingExpected: Boolean, 
isNoScanExpected: Boolean = false): Unit = {
     // with data skipping
     val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
     var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
     try {
-      val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+      val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
dataFilters)
       val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
       val latestDataFilesCount = getLatestDataFilesCount(metaClient = 
metaClient)
       if (isDataSkippingExpected) {
@@ -646,7 +660,7 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
 
       // with no data skipping
       fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
-      val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+      val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
dataFilters).flatMap(s => s.files).size
       assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
     } finally {
       fileIndex.close()

Reply via email to