This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6755587182b [HUDI-8025] Add tests for index updates with table 
services (#12029)
6755587182b is described below

commit 6755587182ba45a173afd43e15070dc0c7d893e6
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Oct 6 02:13:05 2024 +0530

    [HUDI-8025] Add tests for index updates with table services (#12029)
---
 .../hudi/functional/TestPartitionStatsIndex.scala  | 139 ++++++++++++++-
 .../functional/TestSecondaryIndexPruning.scala     | 193 ++++++++++++++++++++-
 2 files changed, 321 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 1dddeb3ffdd..cba98f4a834 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -22,14 +22,16 @@ package org.apache.hudi.functional
 import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
-import org.apache.hudi.common.model.{FileSlice, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType, 
WriteConcurrencyMode, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieWriteConflictException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView
-import org.apache.hudi.util.JFunction
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieMetadataFileSystemView, MetadataPartitionType}
+import org.apache.hudi.util.{JFunction, JavaConversions}
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
@@ -224,6 +226,135 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
     assertEquals(67, snapshot0.count())
   }
 
+  /**
+   * Test case to do updates and then validate partition stats with cleaning.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testPartitionStatsWithCompactionAndCleaning(tableType: HoodieTableType): 
Unit = {
+    var hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      hudiOpts = hudiOpts ++ Map(
+        HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
+        HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+        HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"
+      )
+    }
+    // insert followed by two upserts (trigger a compaction so that prev 
version can be cleaned)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    // Clean Operation
+    val lastCleanInstant = 
getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant()
+    assertTrue(lastCleanInstant.isPresent)
+    // validation that the compaction commit is present in case of MOR table
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      val lastCompactionInstant = 
getLatestMetaClient(false).getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant()
+      assertTrue(lastCompactionInstant.isPresent)
+    }
+
+    // do another upsert and validate the partition stats including file 
pruning
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    validateDataAndPartitionStats()
+    createTempTable(hudiOpts)
+    verifyQueryPredicate(hudiOpts)
+  }
+
+  /**
+   * Test case to do updates and then validate partition stats with clustering.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testPartitionStatsWithClustering(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+      KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key() -> "true")
+
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    // validate clustering instant
+    val lastClusteringInstant = getLatestClusteringInstant
+    assertTrue(getLatestClusteringInstant.isPresent)
+    // do two more rounds of upsert to trigger another clustering
+    doWriteAndValidateDataAndPartitionStats(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndPartitionStats(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    
assertTrue(getLatestClusteringInstant.get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
 > 0)
+    assertEquals(getLatestClusteringInstant, 
metaClient.getActiveTimeline.lastInstant())
+    // We are validating rollback of a DT clustering instant here
+    rollbackLastInstant(hudiOpts)
+
+    validateDataAndPartitionStats()
+    createTempTable(hudiOpts)
+    verifyQueryPredicate(hudiOpts)
+  }
+
+  /**
+   * Test case to do updates and then validate partition stats with MDT 
compaction.
+   * Any one table type is enough to test this as we are validating the 
metadata table.
+   */
+  @Test
+  def testPartitionStatsWithMDTCompaction(): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+    )
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    // validate MDT compaction instant
+    val metadataTableFSView = getHoodieTable(metaClient, 
getWriteConfig(hudiOpts)).getMetadataTable
+      .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
+    try {
+      val compactionTimeline = 
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
+      val lastCompactionInstant = compactionTimeline
+        .filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
+          
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get,
 classOf[HoodieCommitMetadata])
+            .getOperationType == WriteOperationType.COMPACT))
+        .lastInstant()
+      val compactionBaseFile = 
metadataTableFSView.getAllBaseFiles(MetadataPartitionType.PARTITION_STATS.getPartitionPath)
+        .filter(JavaConversions.getPredicate((f: HoodieBaseFile) => 
f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
+        .findAny()
+      assertTrue(compactionBaseFile.isPresent)
+    } finally {
+      metadataTableFSView.close()
+    }
+  }
+
   def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = {
     val reckey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs("_row_key").toString)
     val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 6131ce15221..a334504837f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -24,16 +24,18 @@ import 
org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType, 
WriteConcurrencyMode, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieWriteConflictException
 import 
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
-import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, 
HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.util.JFunction
+import org.apache.hudi.util.{JFunction, JavaConversions}
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex, HoodieSparkUtils}
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
@@ -83,7 +85,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       val tableType = testCase.tableType
       val isPartitioned = testCase.isPartitioned
       var hudiOpts = commonOpts
-      hudiOpts = hudiOpts + (
+      hudiOpts = hudiOpts ++ Map(
         DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
         DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
       val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -162,7 +164,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
   def testCreateAndDropSecondaryIndex(): Unit = {
     if (HoodieSparkUtils.gteqSpark3_3) {
       var hudiOpts = commonOpts
-      hudiOpts = hudiOpts + (
+      hudiOpts = hudiOpts ++ Map(
         DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
         DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
       tableName += "test_secondary_index_create_drop_partitioned_mor"
@@ -224,7 +226,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       val tableType = testCase.tableType
       val isPartitioned = testCase.isPartitioned
       var hudiOpts = commonOpts
-      hudiOpts = hudiOpts + (
+      hudiOpts = hudiOpts ++ Map(
         DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
         DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
       val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -303,7 +305,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       val tableType = testCase.tableType
       val isPartitioned = testCase.isPartitioned
       var hudiOpts = commonOpts
-      hudiOpts = hudiOpts + (
+      hudiOpts = hudiOpts ++ Map(
         DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
         DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
       val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -486,6 +488,183 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  /**
+   * Test case to write with updates and validate secondary index with 
multiple writers.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testSecondaryIndexWithCompactionAndCleaning(tableType: HoodieTableType): 
Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+        HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        hudiOpts = hudiOpts ++ Map(
+          HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
+          HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+          HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"
+        )
+      }
+      val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" 
else "mor"
+      tableName += "test_secondary_index_pruning_compact_clean_" + sqlTableType
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = '$sqlTableType',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        spark.sql("set hoodie.compact.inline=true")
+        spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+        spark.sql("set hoodie.metadata.compact.num.delta.commits=15")
+      }
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+      spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1"),
+        Seq("cde", "row2"),
+        Seq("def", "row3")
+      )
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+
+      // update the secondary key column
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1", true),
+        Seq("cde", "row2", false),
+        Seq("def", "row3", false),
+        Seq("xyz", "row1", false)
+      )
+      // validate data and data skipping
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+        Seq(1, "row1", "xyz", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+    }
+  }
+
+  /**
+   * Test case to write with updates and validate secondary index with 
multiple writers.
+   * Any one table type is enough to test this as we are validating the 
metadata table.
+   */
+  @Test
+  def testSecondaryIndexWithMDTCompaction(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2")
+      val tableName = "test_secondary_index_with_mdt_compaction"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = 'mor',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+           |  ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '2'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+
+      // do another insert and validate compaction in metadata table
+      spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+      val metadataTableFSView = 
HoodieSparkTable.create(getWriteConfig(hudiOpts), 
context()).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
+      try {
+        val compactionTimeline = 
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
+        val lastCompactionInstant = compactionTimeline
+          .filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
+            
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get,
 classOf[HoodieCommitMetadata])
+              .getOperationType == WriteOperationType.COMPACT))
+          .lastInstant()
+        val compactionBaseFile = 
metadataTableFSView.getAllBaseFiles("secondary_index_idx_not_record_key_col")
+          .filter(JavaConversions.getPredicate((f: HoodieBaseFile) => 
f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
+          .findAny()
+        assertTrue(compactionBaseFile.isPresent)
+      } finally {
+        metadataTableFSView.close()
+      }
+
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1"),
+        Seq("cde", "row2"),
+        Seq("def", "row3")
+      )
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+        Seq(1, "row1", "abc", "p1")
+      )
+    }
+  }
+
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }

Reply via email to