nsivabalan commented on code in PR #12744:
URL: https://github.com/apache/hudi/pull/12744#discussion_r1936151857


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
     columnStatsRecords
   }
 
-  def prunePartitions(fileIndex: HoodieFileIndex,
-                      queryFilters: Seq[Expression],
-                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
-    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
-      if (containsAnySqlFunction(queryFilters)) {
-        // If the query contains any SQL function, skip the pruning.
-        // Expression Index will be used in such cases, if available.
-        Option.empty
-      } else {
+  def prunePartitions(fileIndex: HoodieFileIndex, queryFilters: 
Seq[Expression]): Option[Set[String]] = {
+    if (isIndexAvailable) {

Review Comment:
   we can do the following
   ```
   isIndexAvailable && queryFilters.nonEmpty) {
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
     columnStatsRecords
   }
 
-  def prunePartitions(fileIndex: HoodieFileIndex,
-                      queryFilters: Seq[Expression],
-                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
-    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
-      if (containsAnySqlFunction(queryFilters)) {
-        // If the query contains any SQL function, skip the pruning.
-        // Expression Index will be used in such cases, if available.
-        Option.empty
-      } else {
+  def prunePartitions(fileIndex: HoodieFileIndex, queryFilters: 
Seq[Expression]): Option[Set[String]] = {
+    if (isIndexAvailable) {
+      // Filter out sql queries. Partition stats only supports simple queries 
on field attribute without any operation on the field
+      val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+      val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+      // Filter out queries involving null and value count checks
+      val filteredQueryFilters: Seq[Expression] = 
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+      lazy val queryReferencedColumns = collectReferencedColumns(spark, 
filteredQueryFilters, tableSchema)
+
+      if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
         val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
         loadTransposed(queryReferencedColumns, readInMemory, Option.empty, 
Option.empty) {
           transposedPartitionStatsDF => {
-            val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
-            if (canLookupInPSI(queryFilters, indexedCols)) {
-              try {
-                
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
-                val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                  .collect()
-                  .map(_.getString(0))
-                  .toSet
-                if (allPartitions.nonEmpty) {
-                  // PARTITION_STATS index exist for all or some columns in 
the filters
-                  // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
-                  //       column in a filter does not have the stats 
available, by making sure such a
-                  //       filter does not prune any partition.
-                  // to be fixed. HUDI-8836.
-                  val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
-                  if (indexFilter.equals(TrueLiteral)) {
-                    // if there are any non indexed cols or we can't translate 
source expr, we can prune partitions based on col stats lookup.
-                    Some(allPartitions)
-                  } else {
-                    Some(transposedPartitionStatsDF.where(new 
Column(indexFilter))
-                      
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                      .collect()
-                      .map(_.getString(0))
-                      .toSet)
-                  }
+            try {
+              
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+              val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+                .collect()
+                .map(_.getString(0))
+                .toSet
+              if (allPartitions.nonEmpty) {
+                // PARTITION_STATS index exist for all or some columns in the 
filters
+                // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
+                //       column in a filter does not have the stats available, 
by making sure such a
+                //       filter does not prune any partition.
+                // to be fixed. HUDI-8836.
+                val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
+                if (indexFilter.equals(TrueLiteral)) {
+                  // if there are any non indexed cols or we can't translate 
source expr, we can prune partitions based on col stats lookup.
+                  Some(allPartitions)
                 } else {
-                  // PARTITION_STATS index does not exist for any column in 
the filters, skip the pruning
-                  Option.empty
+                  Some(transposedPartitionStatsDF.where(new 
Column(indexFilter))

Review Comment:
   we already have `allPartitions` right? 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
     columnStatsRecords
   }
 
-  def prunePartitions(fileIndex: HoodieFileIndex,
-                      queryFilters: Seq[Expression],
-                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
-    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
-      if (containsAnySqlFunction(queryFilters)) {
-        // If the query contains any SQL function, skip the pruning.
-        // Expression Index will be used in such cases, if available.
-        Option.empty
-      } else {
+  def prunePartitions(fileIndex: HoodieFileIndex, queryFilters: 
Seq[Expression]): Option[Set[String]] = {
+    if (isIndexAvailable) {
+      // Filter out sql queries. Partition stats only supports simple queries 
on field attribute without any operation on the field
+      val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+      val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+      // Filter out queries involving null and value count checks
+      val filteredQueryFilters: Seq[Expression] = 
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+      lazy val queryReferencedColumns = collectReferencedColumns(spark, 
filteredQueryFilters, tableSchema)
+
+      if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
         val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
         loadTransposed(queryReferencedColumns, readInMemory, Option.empty, 
Option.empty) {
           transposedPartitionStatsDF => {
-            val indexedCols: Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
-            if (canLookupInPSI(queryFilters, indexedCols)) {
-              try {
-                
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
-                val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                  .collect()
-                  .map(_.getString(0))
-                  .toSet
-                if (allPartitions.nonEmpty) {
-                  // PARTITION_STATS index exist for all or some columns in 
the filters
-                  // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
-                  //       column in a filter does not have the stats 
available, by making sure such a
-                  //       filter does not prune any partition.
-                  // to be fixed. HUDI-8836.
-                  val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
-                  if (indexFilter.equals(TrueLiteral)) {
-                    // if there are any non indexed cols or we can't translate 
source expr, we can prune partitions based on col stats lookup.
-                    Some(allPartitions)
-                  } else {
-                    Some(transposedPartitionStatsDF.where(new 
Column(indexFilter))
-                      
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-                      .collect()
-                      .map(_.getString(0))
-                      .toSet)
-                  }
+            try {
+              
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+              val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+                .collect()
+                .map(_.getString(0))
+                .toSet
+              if (allPartitions.nonEmpty) {
+                // PARTITION_STATS index exist for all or some columns in the 
filters
+                // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has 
covered the case where the
+                //       column in a filter does not have the stats available, 
by making sure such a
+                //       filter does not prune any partition.
+                // to be fixed. HUDI-8836.
+                val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = 
indexedCols)).reduce(And)
+                if (indexFilter.equals(TrueLiteral)) {
+                  // if there are any non indexed cols or we can't translate 
source expr, we can prune partitions based on col stats lookup.

Review Comment:
   minor. "we cannot prune" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to