nsivabalan commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1803414355
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -86,13 +93,374 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/9/"))
+ }
+ val logFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(".log")).findFirst().get()
+ logFileName = logFileFileStatus.getPath.getName
+ } else {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/9/"))
+ }
+ val baseFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(lastCompletedCommit.getTimestamp)).findFirst().get()
+ baseFileName = baseFileFileStatus.getPath.getName
+ }
+
+ val latestCompletedFileName = lastCompletedCommit.getFileName
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" +
latestCompletedFileName))
+
+ // re-create marker for the deleted file.
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + logFileName + ".marker.APPEND"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + logFileName + ".marker.APPEND"))
+ }
+ } else {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + baseFileName + ".marker.MERGE"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + baseFileName + ".marker.MERGE"))
+ }
+ }
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("", "c8"))
+ def testColStatsWithCleanCOW(partitionCol: String): Unit = {
Review Comment:
tests for clean validation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]