lokeshj1703 commented on code in PR #9345:
URL: https://github.com/apache/hudi/pull/9345#discussion_r1284805940


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
    * @return list of PartitionDirectory containing partition to base files 
mapping
    */
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
+    // Prune the partition path by the partition filters
+    // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
+    //       encompassing the whole table
+    val partitionsAndFileSlices = 
getFileSlicesForPrunedPartitions(partitionFilters)
+    val listedPartitions = filterFileSlices(dataFilters, 
partitionsAndFileSlices).map {

Review Comment:
   Done



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
    * @return list of PartitionDirectory containing partition to base files 
mapping
    */
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
+    // Prune the partition path by the partition filters
+    // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
+    //       encompassing the whole table
+    val partitionsAndFileSlices = 
getFileSlicesForPrunedPartitions(partitionFilters)
+    val listedPartitions = filterFileSlices(dataFilters, 
partitionsAndFileSlices).map {
+      case (partition, fileSlices) =>
+        val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+          val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+          val logFilesStatus = if (includeLogFiles) {
+            
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+          } else {
+            java.util.stream.Stream.empty()
+          }
+          val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+          baseFileStatusOpt.foreach(f => files.append(f))
+          files
+        })
+
+        PartitionDirectory(InternalRow.fromSeq(partition.get.values), 
allCandidateFiles)

Review Comment:
   This behaviour hasn't changed from previous implementation. 
`org.apache.hudi.HoodieFileIndex#filterFileSlices` returns the same partition 
as it receives in the input. So .get should be okay here since we are setting 
the partition before sending.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -125,6 +125,90 @@ class TestColumnStatsIndex extends 
HoodieSparkClientTestBase {
       saveMode = SaveMode.Append)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+  def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit 
= {
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    var commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+      DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+    ) ++ metadataOpts
+
+    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = 
"index/colstats/column-stats-index-table.json",
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+
+    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/another-input-table-json",
+      expectedColStatsSourcePath = 
"index/colstats/updated-column-stats-index-table.json",
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
+    //       deferred updates), diverging from COW
+    val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-updated2-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-updated2-column-stats-index-table.json"
+    }
+
+    createSQLTable(commonOpts, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+    val c5GT50WithDataSkipping = spark.sql("select * from tbl where c5 > 
70").count()
+
+    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/update-input-table-json",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    // verify snapshot query
+    verifySQLQueries(c5GT50WithDataSkipping, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts)
+
+    // verify read_optimized query
+    verifySQLQueries(c5GT50WithDataSkipping, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts)
+
+    // verify incremental query
+    verifySQLQueries(c5GT50WithDataSkipping, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts)
+    commonOpts = commonOpts + 
(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key
 -> "true")
+    verifySQLQueries(c5GT50WithDataSkipping, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts)
+  }
+
+  private def verifySQLQueries(c5GT50WithDataSkippingAtPrevInstant: Long, 
queryType: String, opts: Map[String, String]): Unit = {
+    // 2 records are updated with c5 greater than 70 and one record is 
inserted with c5 value greater than 70
+    var commonOpts:Map[String, String] = opts
+    createSQLTable(commonOpts, queryType)
+    val increment = if 
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) && 
metaClient.getTableType == HoodieTableType.MERGE_ON_READ) {
+      1 // only one insert
+    } else {
+      3 // one insert and two upserts
+    }
+    assertEquals(spark.sql("select * from tbl where c5 > 70").count(), 
c5GT50WithDataSkippingAtPrevInstant + increment)
+    val c5GT50WithDataSkipping = spark.sql("select * from tbl where c5 > 
70").count()
+
+    if 
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
+      createIncrementalSQLTable(commonOpts, 
metaClient.reloadActiveTimeline().getInstants.get(1).getTimestamp)
+      assertEquals(spark.sql("select * from tbl where c5 > 70").count(), 3)
+    }
+
+    commonOpts = opts + (DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"false")
+    createSQLTable(commonOpts, queryType)
+    val c5Gt50WithoutDataSkipping = spark.sql("select * from tbl where c5 > 
70").count()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to