yihua commented on a change in pull request #4948:
URL: https://github.com/apache/hudi/pull/4948#discussion_r823200671



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from 
querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || 
queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = 
getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, 
candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || 
queryFilters.isEmpty) {
+      Option.empty
+    } else {
+      val targetColStatsIndexColumns = Seq(
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+      val requiredMetadataIndexColumns =
+        (targetColStatsIndexColumns :+ 
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+      val metadataTableDF = spark.read.format("org.apache.hudi")
+        
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+      // TODO filter on (column, partition) prefix
+      val colStatsDF = 
metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+        .select(requiredMetadataIndexColumns.map(col): _*)
+
+      val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
+
+      // Persist DF to avoid re-computing column statistics unraveling
+      withPersistence(colStatsDF) {
+        // Metadata Table bears rows in the following format
+        //
+        //  
+---------------------------+------------+------------+------------+-------------+
+        //  |        fileName           | columnName |  minValue  |  maxValue  
|  num_nulls  |
+        //  
+---------------------------+------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          A |          1 |         10 
|           0 |
+        //  | another_base_file.parquet |          A |        -10 |          0 
|           5 |
+        //  
+---------------------------+------------+------------+------------+-------------+
+        //
+        // While Data Skipping utils are expecting following (transposed) 
format, where per-column stats are
+        // essentially transposed (from rows to columns):
+        //
+        //  
+---------------------------+------------+------------+-------------+
+        //  |          file             | A_minValue | A_maxValue | 
A_num_nulls |
+        //  
+---------------------------+------------+------------+-------------+
+        //  | one_base_file.parquet     |          1 |         10 |           
0 |
+        //  | another_base_file.parquet |        -10 |          0 |           
5 |
+        //  
+---------------------------+------------+------------+-------------+
+        //
+        // NOTE: Column Stats Index might potentially contain statistics for 
many columns (if not all), while
+        //       query at hand might only be referencing a handful of those. 
As such, we collect all the
+        //       column references from the filtering expressions, and only 
transpose records corresponding to the
+        //       columns referenced in those
+        val transposedColStatsDF =
+        queryReferencedColumns.map(colName =>
+          
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
+            .select(targetColStatsIndexColumns.map(col): _*)
+            
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, 
getNumNullsColumnNameFor(colName))
+            
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, 
getMinColumnNameFor(colName))
+            
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, 
getMaxColumnNameFor(colName))
+        )
+          .reduceLeft((left, right) =>
+            left.join(right, usingColumn = 
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))

Review comment:
       Based on the code, I thought all columns from predicates are going to 
trigger joining (i.e., n number of columns -> n-1 joins), not just the 
clustering columns, since column stats index in metadata table can contain all 
columns from the schema.
   
   There are cases where the table is fat (1k to 10k+ number of columns, see 
[this 
blog](https://hudi.apache.org/blog/2021/09/01/building-eb-level-data-lake-using-hudi-at-bytedance/))
 and the queries can have more than 10 predicates at Uber and ByteDance.   
Bytedance has PB-level tables which can easily have 10s of M files in a few 
partitions.  I worry that the joining can take a hit for this kind of scale.
   
   I understand that some kind of "joining" is needed here, but the spark table 
join in the current scheme expands the table after each join and adds 
additional col stats column.  If for each of the df from a column from the 
following applies the filter first and generate a boolean for each file, then 
the next step is going to do AND, which does not require expanding columns and 
an additional cached table, reducing memory pressure and possible shuffling.  
Then that is much less costly than spark table/df join.
   
   ```
   queryReferencedColumns.map(colName =>
             
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
               .select(targetColStatsIndexColumns.map(col): _*)
               
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, 
getNumNullsColumnNameFor(colName))
               
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, 
getMinColumnNameFor(colName))
               
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, 
getMaxColumnNameFor(colName))
           )
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to