nsivabalan commented on code in PR #12214:
URL: https://github.com/apache/hudi/pull/12214#discussion_r1833546809


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -697,6 +698,420 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {

Review Comment:
   can we get some more tests where a single batch contains 100s of records. 
   All tests I see where are ingesting just 1 record (1 insert or 1 delete or 2 
update). 
   can you create a temp source table and then ingest from the source to the 
target table 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -697,6 +698,420 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_multiple_updates_same_record"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do another insert and validate compaction in metadata table
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where 
record_key_col = 'row3'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+
+      )
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz2')")(
+        Seq(3, "row3", "xyz2", "p2")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz2")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithOnlyDeleteLogs(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_only_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do a hard delete
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithUpdateFollowedByDelete(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_update_and_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // few updates for same key
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row2'")
+      // delete for same key that was just updated
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+        Seq(3, "row3", "xyz1", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz1")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(

Review Comment:
   can we check for all keys we have ingested so far. 
   ```
   not_record_key_col in ('abc','def','hjk','xyz','xyz1')
   ```
   
   and change the expected value accordingly. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -697,6 +698,420 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_multiple_updates_same_record"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do another insert and validate compaction in metadata table
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where 
record_key_col = 'row3'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+
+      )
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz2')")(
+        Seq(3, "row3", "xyz2", "p2")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz2")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithOnlyDeleteLogs(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_only_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do a hard delete
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithUpdateFollowedByDelete(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_update_and_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // few updates for same key
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row2'")
+      // delete for same key that was just updated
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+        Seq(3, "row3", "xyz1", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz1")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithSameSecondaryKeyUpdatesForMultipleRecords(): Unit 
= {

Review Comment:
   can we try to consolidate lot of these tests into few tests. 
   we just need to cover all cases right. 
   we can also keep adding few batches at a time and do validation. and 
continue w/ few more batches, more validations etc. 
   
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -697,6 +698,420 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_multiple_updates_same_record"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do another insert and validate compaction in metadata table
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where 
record_key_col = 'row3'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+
+      )
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz2')")(
+        Seq(3, "row3", "xyz2", "p2")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz2")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithOnlyDeleteLogs(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_only_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do a hard delete
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithUpdateFollowedByDelete(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_update_and_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // few updates for same key
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row2'")
+      // delete for same key that was just updated
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+        Seq(3, "row3", "xyz1", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz1")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithSameSecondaryKeyUpdatesForMultipleRecords(): Unit 
= {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_same_sec_key"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // do updates for different records with same secondary key
+      spark.sql(s"update $tableName set not_record_key_col = 'def' where 
record_key_col = 'row1'")
+      spark.sql(s"update $tableName set not_record_key_col = 'abc' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'poc' where 
record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(

Review Comment:
   can we validate for keys we ingested 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -697,6 +698,420 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_multiple_updates_same_record"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do another insert and validate compaction in metadata table
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where 
record_key_col = 'row3'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+
+      )
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz2')")(
+        Seq(3, "row3", "xyz2", "p2")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz2")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithOnlyDeleteLogs(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_only_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3",false)
+      )
+
+      // do a hard delete
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithUpdateFollowedByDelete(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_update_and_delete"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p1')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p1')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // few updates for same key
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz1' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row2'")
+      // delete for same key that was just updated
+      spark.sql(s"delete from $tableName where record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('xyz','xyz1')")(
+        Seq(3, "row3", "xyz1", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "xyz1")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithSameSecondaryKeyUpdatesForMultipleRecords(): Unit 
= {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "20")
+      val tableName = "test_secondary_index_with_same_sec_key"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '20'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'abc', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'hjk', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+        Seq(s"hjk${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false)
+      )
+
+      // do updates for different records with same secondary key
+      spark.sql(s"update $tableName set not_record_key_col = 'def' where 
record_key_col = 'row1'")
+      spark.sql(s"update $tableName set not_record_key_col = 'abc' where 
record_key_col = 'row3'")
+      spark.sql(s"update $tableName set not_record_key_col = 'poc' where 
record_key_col = 'row2'")
+
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col in ('abc')")(
+        Seq(3, "row3", "abc", "p2")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+    }
+  }
+
+
+  /**
+   * Test case to write with updates and validate secondary index with 
clustering.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testSecondaryIndexWithClusteringAndCleaning(tableType: HoodieTableType): 
Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+        HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "1")
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        hudiOpts = hudiOpts ++ Map(
+          HoodieClusteringConfig.INLINE_CLUSTERING.key -> "true",
+          HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> "1",
+          HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0"
+        )
+      }
+      val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" 
else "mor"
+      tableName += "test_secondary_index_pruning_cluster_clean_" + sqlTableType
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = '$sqlTableType',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql("set hoodie.clustering.inline=true")
+      spark.sql("set hoodie.clustering.inline.max.commits=1")
+
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      confirmLastCommitType(ActionType.replacecommit)
+      spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+      confirmLastCommitType(ActionType.replacecommit)
+      spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+      confirmLastCommitType(ActionType.replacecommit)

Review Comment:
   where do we validate clean commits 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to