codope commented on a change in pull request #4948:
URL: https://github.com/apache/hudi/pull/4948#discussion_r821266033



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from 
querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || 
queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = 
getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, 
candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || 
queryFilters.isEmpty) {

Review comment:
       `fs.exists` is going to happen in every call if data skipping is 
enabled. This will hit perf as we observed in Presto. We should try to avoid 
it. I think we should just assume that metadata table exists and error out if 
it doesn't.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -259,34 +260,4 @@ object DataSkippingUtils extends Logging {
         throw new AnalysisException(s"convert reference to name failed,  Found 
unsupported expression ${other}")
     }
   }
-
-  def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = 
{

Review comment:
       Why remove these two methods? Are they not being used anywhere?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
##########
@@ -333,6 +333,57 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
     assert(fileIndex.getAllQueryPartitionPaths.get(0).path.equals("c"))
   }
 
+  @Test
+  def testDataSkippingWhileFileListing(): Unit = {
+    val r = new Random(0xDEED)
+    val tuples = for (i <- 1 to 1000) yield (i, 1000 - i, r.nextString(5), 
r.nextInt(4))
+
+    val _spark = spark
+    import _spark.implicits._
+    val inputDF = tuples.toDF("id", "inv_id", "str", "rand")
+
+    val opts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      RECORDKEY_FIELD.key -> "id",
+      PRECOMBINE_FIELD.key -> "id",
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> 
"true",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+    )
+
+    inputDF.repartition(4)
+      .write
+      .format("hudi")
+      .options(opts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 100 * 1024)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+
+    val props = Map[String, String](
+      "path" -> basePath,
+      QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+    )
+
+    val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, 
NoopCache)
+
+    val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
+    assertEquals(10, allFilesPartitions.head.files.length)
+
+    // We're selecting a single file that contains "id" == 1 row, which there 
should be
+    // strictly 1. Given that 1 is minimal possible value, Data Skipping 
should be able to
+    // truncate search space to just a single file
+    val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = 
false)(), Literal(1))

Review comment:
       perhaps we can add tests for more expressions apart from `EqualTo`

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from 
querying engine
    * @return list of pruned (data-skipped) candidate base-files' names
    */
-  private def lookupCandidateFilesInColStatsIndex(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val indexPath = metaClient.getColumnStatsIndexPath
+  private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
     val fs = metaClient.getFs
+    val metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || 
queryFilters.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val completedCommits = 
getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
-    // Collect all index tables present in `.zindex` folder
-    val candidateIndexTables =
-      fs.listStatus(new Path(indexPath))
-        .filter(_.isDirectory)
-        .map(_.getPath.getName)
-        .filter(completedCommits.contains(_))
-        .sortBy(x => x)
-
-    if (candidateIndexTables.isEmpty) {
-      // scalastyle:off return
-      return Success(Option.empty)
-      // scalastyle:on return
-    }
-
-    val dataFrameOpt = try {
-      Some(spark.read.load(new Path(indexPath, 
candidateIndexTables.last).toString))
-    } catch {
-      case t: Throwable =>
-        logError("Failed to read col-stats index; skipping", t)
-        None
+    if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || 
queryFilters.isEmpty) {
+      Option.empty
+    } else {
+      val targetColStatsIndexColumns = Seq(
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+      val requiredMetadataIndexColumns =
+        (targetColStatsIndexColumns :+ 
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+      val metadataTableDF = spark.read.format("org.apache.hudi")
+        
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+      // TODO filter on (column, partition) prefix
+      val colStatsDF = 
metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+        .select(requiredMetadataIndexColumns.map(col): _*)
+
+      val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
+
+      // Persist DF to avoid re-computing column statistics unraveling
+      withPersistence(colStatsDF) {

Review comment:
       +1 for persistence

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -109,6 +109,14 @@
                                 "string"
                             ]
                         },
+                        {
+                            "doc": "Column name for which this column 
statistics applies",

Review comment:
       This change we'll have to revist when we tackle schema evolution. Can 
you please track it in a JIRA?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -119,18 +121,14 @@ case class HoodieFileIndex(spark: SparkSession,
     //    - Col-Stats Index is present
     //    - List of predicates (filters) is present
     val candidateFilesNamesOpt: Option[Set[String]] =
-      lookupCandidateFilesInColStatsIndex(dataFilters) match {
+      lookupCandidateFilesInMetadataTable(dataFilters) match {
         case Success(opt) => opt
         case Failure(e) =>
-          if (e.isInstanceOf[AnalysisException]) {
-            logDebug("Failed to relay provided data filters to Z-index 
lookup", e)
-          } else {
-            logError("Failed to lookup candidate files in Z-index", e)
-          }
+          logError("Failed to lookup candidate files in Z-index", e)

Review comment:
       maybe change `z-index` to column stats index in this err msg as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to