codope commented on code in PR #13022:
URL: https://github.com/apache/hudi/pull/13022#discussion_r2012620794
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala:
##########
@@ -102,4 +110,260 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
}
}
+
+ /**
+ * Test to ensure that the metadata compaction works as expected after a
downgrade.
+ * The test also validates that the delete blocks are correctly read post
downgrade.
+ */
+ @Test
+ def testMetadataCompactionWithDeleteBlockPostDowngrade() : Unit = {
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Common Hudi options for MERGE_ON_READ table with metadata and column
stats enabled.
+ val hudiOptions = Map[String, String](
+ "hoodie.table.name" -> tableName,
+ RECORDKEY_FIELD.key -> "id",
+ PRECOMBINE_FIELD.key -> "ts",
+ TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(),
+ OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+ KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "price",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "100" // ensure
MDT compaction does not run before downgrade
+ )
+
+ val _spark = spark
+ import _spark.implicits._
+
+ // ------------------------------------------------------------------
+ // Step 1 & 2: Create table and insert two records (initial commit)
+ // ------------------------------------------------------------------
+ println("== Step 1 & 2: Creating table and inserting initial records ==")
+ val initialDF = Seq(
+ (1, "Alice", 1000, 10), // (id, name, ts, price)
+ (2, "Bob", 1000, 20)
+ ).toDF("id", "name", "ts", "price")
+
+ initialDF.write.format("hudi")
+ .options(hudiOptions)
+ .mode("overwrite")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 3: Update the records (generating a new log file with updated
stats)
+ // ------------------------------------------------------------------
+ println("== Step 3: Updating records ==")
+ val updateDF = Seq(
+ (1, "Alice", 2000, 15), // update Alice
+ (2, "Bob", 2000, 25) // update Bob
+ ).toDF("id", "name", "ts", "price")
+
+ updateDF.write.format("hudi")
+ .options(hudiOptions)
+ .mode("append")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 4: Trigger compaction so that a new base file is produced.
+ // For inline compaction, we set inline compaction options.
+ // ------------------------------------------------------------------
+ val compactionOptions = hudiOptions ++ Map(
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+ // Writing the same updateDF triggers inline compaction in a MERGE_ON_READ
table.
+ updateDF.write.format("hudi")
+ .options(compactionOptions)
+ .mode("append")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 5: Perform a clean operation.
+ // The clean removes older log files for column stats, leaving behind
delete blocks.
+ // ------------------------------------------------------------------
+ val cleanOptions = hudiOptions ++ Map(
+ HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key -> "1"
+ )
+ val client = DataSourceUtils.createHoodieClient(
+ spark.sparkContext, "", basePath, tableName, cleanOptions.asJava
+ ).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+ // Trigger cleaning via an additional write (which invokes cleaning)
+ val cleanInstant = client.scheduleTableService(Option.empty(),
TableServiceType.CLEAN)
+ client.clean(cleanInstant.get())
+ client.close()
+
+ // ------------------------------------------------------------------
+ // Step 6: Downgrade to version 6.
+ // ------------------------------------------------------------------
+ val downgradedOptions = hudiOptions ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString
+ )
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ new UpgradeDowngrade(metaClient, getWriteConfig(downgradedOptions,
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.SIX, null)
+ // assert that metadata table version is 6 and it has been compacted
+ var metadataMetaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getStorageConf).setBasePath(basePath
+ "/.hoodie/metadata").build()
+ assertEquals(HoodieTableVersion.SIX,
metadataMetaClient.getTableConfig.getTableVersion)
+ assertEquals(1,
metadataMetaClient.getActiveTimeline.getCommitTimeline.getInstants.size())
+ val compactionInstant =
metadataMetaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant().get()
+
+ // ------------------------------------------------------------------
+ // Step 7: Read the table using a predicate on 'price' to force the column
stats read path.
+ // This should trigger the error during deserialization of the delete
block.
+ // ------------------------------------------------------------------
+ val df = spark.read.format("hudi").load(basePath).filter("price > 15")
+ // assert the result
+ assertEquals(1, df.count())
+ // assert that only id=2 is present
+ assertEquals(1, df.filter("id = 2").count())
+
+ // ------------------------------------------------------------------
+ // Step 8: Do a commit with table version 6 but this time trigger MDT
compaction.
+ // ------------------------------------------------------------------
+ val writeConfigsPostDowngrade = hudiOptions ++ Map(
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString
+ )
+ val updateDF2 = Seq(
+ (1, "Alice", 3000, 20), // update Alice
+ (2, "Bob", 3000, 30) // update Bob
+ ).toDF("id", "name", "ts", "price")
+ updateDF2.write.format("hudi")
+ .options(writeConfigsPostDowngrade)
+ .mode("append")
+ .save(basePath)
+ // assert that metadata table has been compacted
+ metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient)
+ val lastCompactionInstant =
metadataMetaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant().get()
+ assertTrue(compareTimestamps(lastCompactionInstant.requestedTime(),
GREATER_THAN_OR_EQUALS, compactionInstant.requestedTime()))
+ }
+
+ @Test
+ def testUpgradeDowngradeWithRecordIndex() : Unit = {
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Common Hudi options for MERGE_ON_READ table with metadata and column
stats enabled.
+ val hudiOptions = Map[String, String](
+ "hoodie.table.name" -> tableName,
+ RECORDKEY_FIELD.key -> "id",
+ PRECOMBINE_FIELD.key -> "ts",
+ TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(),
+ OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+ KEYGENERATOR_CLASS_NAME.key ->
classOf[NonpartitionedKeyGenerator].getName,
+ PAYLOAD_CLASS_NAME.key -> classOf[DefaultHoodieRecordPayload].getName,
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true", // enable
RECORD_INDEX
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "100", // ensure
compaction does not run before downgrade
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false" // write in table
version 6 and disable auto upgrade
+ )
+
+ val _spark = spark
+ import _spark.implicits._
+
+ // ------------------------------------------------------------------
+ // Step 1 & 2: Create table and insert two records (initial commit)
+ // ------------------------------------------------------------------
+ println("== Step 1 & 2: Creating table and inserting initial records ==")
+ val initialDF = Seq(
+ (1, "Alice", 1000, 10), // (id, name, ts, price)
+ (2, "Bob", 1000, 20)
+ ).toDF("id", "name", "ts", "price")
+
+ initialDF.write.format("hudi")
+ .options(hudiOptions)
+ .mode("overwrite")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 3: Update the records (generating a new log file with updated
stats)
+ // ------------------------------------------------------------------
+ println("== Step 3: Updating records ==")
+ val updateDF = Seq(
+ (1, "Alice", 2000, 15), // update Alice
+ (2, "Bob", 2000, 25) // update Bob
+ ).toDF("id", "name", "ts", "price")
+
+ updateDF.write.format("hudi")
+ .options(hudiOptions)
+ .mode("append")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 4: Trigger compaction so that a new base file is produced.
+ // For inline compaction, we set inline compaction options.
+ // ------------------------------------------------------------------
+ val compactionOptions = hudiOptions ++ Map(
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+ // Writing the same updateDF triggers inline compaction in a MERGE_ON_READ
table.
+ updateDF.write.format("hudi")
+ .options(compactionOptions)
+ .mode("append")
+ .save(basePath)
+
+ // ------------------------------------------------------------------
+ // Step 5: Perform a clean operation.
+ // ------------------------------------------------------------------
+ val cleanOptions = hudiOptions ++ Map(
+ HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key -> "1"
+ )
+ val client = DataSourceUtils.createHoodieClient(
+ spark.sparkContext, "", basePath, tableName, cleanOptions.asJava
+ ).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+ // Trigger cleaning via an additional write (which invokes cleaning)
+ val cleanInstant = client.scheduleTableService(Option.empty(),
TableServiceType.CLEAN)
+ client.clean(cleanInstant.get())
+ client.close()
+
+ // ------------------------------------------------------------------
+ // Step 6: Upgrade to version 8.
+ // ------------------------------------------------------------------
+ val upgradedOptions = hudiOptions ++ Map(
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.EIGHT.versionCode().toString
+ )
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ new UpgradeDowngrade(metaClient, getWriteConfig(upgradedOptions,
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.EIGHT, null)
+ // assert table version
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
+
+ // TODO: remove this once we have the fix from PR#13009
+ val props = metaClient.getTableConfig.getProps
+ props.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name())
+ HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath,
props)
Review Comment:
Had to set the merge mode in addition to payload class. Just setting payload
class does not automatically set the merge mode in upgrade step. I hope that
will be fixed by #13009
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]