This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6755587182b [HUDI-8025] Add tests for index updates with table
services (#12029)
6755587182b is described below
commit 6755587182ba45a173afd43e15070dc0c7d893e6
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Oct 6 02:13:05 2024 +0530
[HUDI-8025] Add tests for index updates with table services (#12029)
---
.../hudi/functional/TestPartitionStatsIndex.scala | 139 ++++++++++++++-
.../functional/TestSecondaryIndexPruning.scala | 193 ++++++++++++++++++++-
2 files changed, 321 insertions(+), 11 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 1dddeb3ffdd..cba98f4a834 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -22,14 +22,16 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
-import org.apache.hudi.common.model.{FileSlice,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile,
HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType,
WriteConcurrencyMode, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig,
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView
-import org.apache.hudi.util.JFunction
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieMetadataFileSystemView, MetadataPartitionType}
+import org.apache.hudi.util.{JFunction, JavaConversions}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
@@ -224,6 +226,135 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
assertEquals(67, snapshot0.count())
}
+ /**
+ * Test case to do updates and then validate partition stats with cleaning.
+ */
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testPartitionStatsWithCompactionAndCleaning(tableType: HoodieTableType):
Unit = {
+ var hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ hudiOpts = hudiOpts ++ Map(
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"
+ )
+ }
+ // insert followed by two upserts (trigger a compaction so that prev
version can be cleaned)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ // Clean Operation
+ val lastCleanInstant =
getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant()
+ assertTrue(lastCleanInstant.isPresent)
+ // validation that the compaction commit is present in case of MOR table
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val lastCompactionInstant =
getLatestMetaClient(false).getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant()
+ assertTrue(lastCompactionInstant.isPresent)
+ }
+
+ // do another upsert and validate the partition stats including file
pruning
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+ validateDataAndPartitionStats()
+ createTempTable(hudiOpts)
+ verifyQueryPredicate(hudiOpts)
+ }
+
+ /**
+ * Test case to do updates and then validate partition stats with clustering.
+ */
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testPartitionStatsWithClustering(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+ KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key() -> "true")
+
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ // validate clustering instant
+ val lastClusteringInstant = getLatestClusteringInstant
+ assertTrue(getLatestClusteringInstant.isPresent)
+ // do two more rounds of upsert to trigger another clustering
+ doWriteAndValidateDataAndPartitionStats(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ doWriteAndValidateDataAndPartitionStats(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
assertTrue(getLatestClusteringInstant.get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
> 0)
+ assertEquals(getLatestClusteringInstant,
metaClient.getActiveTimeline.lastInstant())
+ // We are validating rollback of a DT clustering instant here
+ rollbackLastInstant(hudiOpts)
+
+ validateDataAndPartitionStats()
+ createTempTable(hudiOpts)
+ verifyQueryPredicate(hudiOpts)
+ }
+
+ /**
+ * Test case to do updates and then validate partition stats with MDT
compaction.
+ * Any one table type is enough to test this as we are validating the
metadata table.
+ */
+ @Test
+ def testPartitionStatsWithMDTCompaction(): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+ )
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ // validate MDT compaction instant
+ val metadataTableFSView = getHoodieTable(metaClient,
getWriteConfig(hudiOpts)).getMetadataTable
+ .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
+ try {
+ val compactionTimeline =
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
+ val lastCompactionInstant = compactionTimeline
+ .filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
+
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get,
classOf[HoodieCommitMetadata])
+ .getOperationType == WriteOperationType.COMPACT))
+ .lastInstant()
+ val compactionBaseFile =
metadataTableFSView.getAllBaseFiles(MetadataPartitionType.PARTITION_STATS.getPartitionPath)
+ .filter(JavaConversions.getPredicate((f: HoodieBaseFile) =>
f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
+ .findAny()
+ assertTrue(compactionBaseFile.isPresent)
+ } finally {
+ metadataTableFSView.close()
+ }
+ }
+
def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = {
val reckey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs("_row_key").toString)
val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 6131ce15221..a334504837f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -24,16 +24,18 @@ import
org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile,
HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType,
WriteConcurrencyMode, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
-import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter,
HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.util.JFunction
+import org.apache.hudi.util.{JFunction, JavaConversions}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex, HoodieSparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
@@ -83,7 +85,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
val tableType = testCase.tableType
val isPartitioned = testCase.isPartitioned
var hudiOpts = commonOpts
- hudiOpts = hudiOpts + (
+ hudiOpts = hudiOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -162,7 +164,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
def testCreateAndDropSecondaryIndex(): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
var hudiOpts = commonOpts
- hudiOpts = hudiOpts + (
+ hudiOpts = hudiOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
tableName += "test_secondary_index_create_drop_partitioned_mor"
@@ -224,7 +226,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
val tableType = testCase.tableType
val isPartitioned = testCase.isPartitioned
var hudiOpts = commonOpts
- hudiOpts = hudiOpts + (
+ hudiOpts = hudiOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -303,7 +305,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
val tableType = testCase.tableType
val isPartitioned = testCase.isPartitioned
var hudiOpts = commonOpts
- hudiOpts = hudiOpts + (
+ hudiOpts = hudiOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
@@ -486,6 +488,183 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ /**
+ * Test case to write with updates and validate secondary index with
multiple writers.
+ */
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testSecondaryIndexWithCompactionAndCleaning(tableType: HoodieTableType):
Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ hudiOpts = hudiOpts ++ Map(
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"
+ )
+ }
+ val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow"
else "mor"
+ tableName += "test_secondary_index_pruning_compact_clean_" + sqlTableType
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | record_key_col string,
+ | not_record_key_col string,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+ | ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
+ | )
+ | partitioned by(partition_key_col)
+ | location '$basePath'
+ """.stripMargin)
+ // by setting small file limit to 0, each insert will create a new file
+ // need to generate more file for non-partitioned table to test data
skipping
+ // as the partitioned table will have only one file per partition
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ spark.sql("set hoodie.metadata.compact.num.delta.commits=15")
+ }
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+ spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+ // create secondary index
+ spark.sql(s"create index idx_not_record_key_col on $tableName using
secondary_index(not_record_key_col)")
+ // validate index created successfully
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1"),
+ Seq("cde", "row2"),
+ Seq("def", "row3")
+ )
+ // validate data skipping with filters on secondary key column
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+ Seq(1, "row1", "abc", "p1")
+ )
+ verifyQueryPredicate(hudiOpts, "not_record_key_col")
+
+ // update the secondary key column
+ spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1", true),
+ Seq("cde", "row2", false),
+ Seq("def", "row3", false),
+ Seq("xyz", "row1", false)
+ )
+ // validate data and data skipping
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
+ Seq(1, "row1", "xyz", "p1")
+ )
+ verifyQueryPredicate(hudiOpts, "not_record_key_col")
+ }
+ }
+
+ /**
+ * Test case to write with updates and validate secondary index with
multiple writers.
+ * Any one table type is enough to test this as we are validating the
metadata table.
+ */
+ @Test
+ def testSecondaryIndexWithMDTCompaction(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2")
+ val tableName = "test_secondary_index_with_mdt_compaction"
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | record_key_col string,
+ | not_record_key_col string,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | type = 'mor',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+ | ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '2'
+ | )
+ | partitioned by(partition_key_col)
+ | location '$basePath'
+ """.stripMargin)
+ // by setting small file limit to 0, each insert will create a new file
+ // need to generate more file for non-partitioned table to test data
skipping
+ // as the partitioned table will have only one file per partition
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+ // create secondary index
+ spark.sql(s"create index idx_not_record_key_col on $tableName using
secondary_index(not_record_key_col)")
+ // validate index created successfully
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+
+ // do another insert and validate compaction in metadata table
+ spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+ val metadataTableFSView =
HoodieSparkTable.create(getWriteConfig(hudiOpts),
context()).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
+ try {
+ val compactionTimeline =
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
+ val lastCompactionInstant = compactionTimeline
+ .filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
+
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get,
classOf[HoodieCommitMetadata])
+ .getOperationType == WriteOperationType.COMPACT))
+ .lastInstant()
+ val compactionBaseFile =
metadataTableFSView.getAllBaseFiles("secondary_index_idx_not_record_key_col")
+ .filter(JavaConversions.getPredicate((f: HoodieBaseFile) =>
f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
+ .findAny()
+ assertTrue(compactionBaseFile.isPresent)
+ } finally {
+ metadataTableFSView.close()
+ }
+
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1"),
+ Seq("cde", "row2"),
+ Seq("def", "row3")
+ )
+ // validate data skipping with filters on secondary key column
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+ Seq(1, "row1", "abc", "p1")
+ )
+ }
+ }
+
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}