nsivabalan commented on code in PR #11634:
URL: https://github.com/apache/hudi/pull/11634#discussion_r1742488182


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -448,9 +448,9 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
 
       // Perform the commit using bulkCommit
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
-      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
-      metadataMetaClient.reloadActiveTimeline();
       String partitionPath = (partitionType == FUNCTIONAL_INDEX || 
partitionType == SECONDARY_INDEX) ? 
dataWriteConfig.getIndexingConfig().getIndexName() : 
partitionType.getPartitionPath();
+      bulkCommit(commitTimeForPartition, partitionPath, records, 
fileGroupCount);

Review Comment:
   +1



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -86,6 +196,82 @@ class TestSecondaryIndexPruning extends 
SecondaryIndexTestBase {
       )
       verifyQueryPredicate(hudiOpts, "not_record_key_col")
 
+      // update the secondary key column
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1", true),
+        Seq("cde", "row2", false),
+        Seq("def", "row3", false),
+        Seq("xyz", "row1", false)
+      )
+      // validate data and data skipping
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+        Seq(1, "row1", "xyz", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testSecondaryIndexWithPartitionStatsIndex(tableType: String): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts + (
+        DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
+      tableName += "test_secondary_index_with_partition_stats_index" + 
sqlTableType
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  name string,
+           |  record_key_col string,
+           |  secondary_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = '$sqlTableType',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  'hoodie.metadata.index.partition.stats.enable' = 'true',
+           |  'hoodie.metadata.index.column.stats.column.list' = 'name',
+           |  hoodie.enable.data.skipping = 'true'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'gandhi', 'row1', 'abc', 
'p1')")
+      spark.sql(s"insert into $tableName values(2, 'nehru', 'row2', 'cde', 
'p2')")
+      spark.sql(s"insert into $tableName values(3, 'patel', 'row3', 'def', 
'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_secondary_key_col on $tableName using 
secondary_index(secondary_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_secondary_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1"),
+        Seq("cde", "row2"),
+        Seq("def", "row3")
+      )
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, secondary_key_col, 
partition_key_col from $tableName where secondary_key_col = 'abc'")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "secondary_key_col")

Review Comment:
   minor. 
   can you close HoodieFileIndex within verifyQueryPredicate



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1890,7 +1890,7 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
       List<String> logFilePaths = new ArrayList<>();
       baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
       String filePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(filePath(basePath, partition, filePath));
+      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(FSUtils.constructAbsolutePath(basePath, filePath));

Review Comment:
   btw, I did triage the basefilePath. Its coming from HoodieDeltaWriteStat. It 
refers to just base file name according to source code. We might have to 
revisit tests to see if we generate right values if we are mocking the 
HoodieDeltaWriteStat. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -105,7 +138,210 @@ class TestSecondaryIndexPruning extends 
SecondaryIndexTestBase {
     }
   }
 
-  private def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = {
-    assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(sql).collect().sortBy(_.toString()))
+  @Test
+  def testSecondaryIndexPruningWithUpdates(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts + (
+        DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.COPY_ON_WRITE.name(),

Review Comment:
   lets also add a test for non partitioned table



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1890,7 +1890,7 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
       List<String> logFilePaths = new ArrayList<>();
       baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
       String filePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(filePath(basePath, partition, filePath));
+      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(FSUtils.constructAbsolutePath(basePath, filePath));

Review Comment:
   can we rename filePath in L 1877 to baseFilePath



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1890,7 +1890,7 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
       List<String> logFilePaths = new ArrayList<>();
       baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
       String filePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(filePath(basePath, partition, filePath));
+      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(FSUtils.constructAbsolutePath(basePath, filePath));

Review Comment:
   yes, same q as ethan. was there a bug before that we are fixing ? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1874,7 +1874,7 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngi
       List<String> logFilePaths = new ArrayList<>();
       baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
       String filePath = baseAndLogFiles.getKey();
-      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(filePath(basePath, partition, filePath));
+      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(FSUtils.constructAbsolutePath(basePath, filePath));

Review Comment:
   comment on L1875. Do we have tests for non-partitioned table for functional 
and sec index. 
   ```
   baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath + 
StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
   ```
   
   This could create a path w/ 2 separators (/) for non-partitioned table. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to