lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1907314092


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala:
##########
@@ -92,6 +95,47 @@ class ExpressionIndexSupport(spark: SparkSession,
     }
   }
 
+  def prunePartitions(fileIndex: HoodieFileIndex,

Review Comment:
   Addressed



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -330,9 +330,19 @@ case class HoodieFileIndex(spark: SparkSession,
             // fall back to listing all partitions
             case _: HoodieException => (false, 
listMatchingPartitionPaths(Seq.empty))
           }
+        } else if (isExpressionIndexEnabled) {
+          val expressionIndexSupport = new ExpressionIndexSupport(spark, 
schema, metadataConfig, metaClient)

Review Comment:
   Addressed



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala:
##########
@@ -92,6 +95,47 @@ class ExpressionIndexSupport(spark: SparkSession,
     }
   }
 
+  def prunePartitions(fileIndex: HoodieFileIndex,
+                      queryFilters: Seq[Expression],
+                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
+    lazy val expressionIndexPartitionOpt = 
getExpressionIndexPartitionAndLiterals(queryFilters)
+    if (isIndexAvailable && queryFilters.nonEmpty && 
expressionIndexPartitionOpt.nonEmpty) {
+      val (indexPartition, expressionIndexQuery, _) = 
expressionIndexPartitionOpt.get
+      val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+      if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
 {
+        val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
+        val expressionIndexRecords = 
loadExpressionIndexPartitionStatRecords(indexDefinition, readInMemory)
+        loadTransposed(queryReferencedColumns, readInMemory, 
expressionIndexRecords, expressionIndexQuery) {
+          transposedPartitionStatsDF => {
+            val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+              .collect()
+              .map(_.getString(0))
+              .toSet
+            if (allPartitions.nonEmpty) {
+              // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered 
the case where the
+              //       column in a filter does not have the stats available, 
by making sure such a
+              //       filter does not prune any partition.
+              val indexSchema = transposedPartitionStatsDF.schema
+              val indexFilter = 
Seq(expressionIndexQuery).map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema, isExpressionIndex = true)).reduce(And)
+              Some(transposedPartitionStatsDF.where(new Column(indexFilter))
+                .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+                .collect()

Review Comment:
   Addressed for both EI and partition stats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to