nsivabalan commented on code in PR #13022:
URL: https://github.com/apache/hudi/pull/13022#discussion_r2012722060


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala:
##########
@@ -102,4 +110,260 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
       assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
     }
   }
+
+  /**
+   * Test to ensure that the metadata compaction works as expected after a 
downgrade.
+   * The test also validates that the delete blocks are correctly read post 
downgrade.
+   */
+  @Test
+  def testMetadataCompactionWithDeleteBlockPostDowngrade() : Unit = {
+    initMetaClient(HoodieTableType.MERGE_ON_READ)
+    // Common Hudi options for MERGE_ON_READ table with metadata and column 
stats enabled.
+    val hudiOptions = Map[String, String](
+      "hoodie.table.name" -> tableName,
+      RECORDKEY_FIELD.key -> "id",
+      PRECOMBINE_FIELD.key -> "ts",
+      TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(),
+      OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+      KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "price",
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "100" // ensure 
MDT compaction does not run before downgrade
+    )
+
+    val _spark = spark
+    import _spark.implicits._
+
+    // ------------------------------------------------------------------
+    // Step 1 & 2: Create table and insert two records (initial commit)
+    // ------------------------------------------------------------------
+    println("== Step 1 & 2: Creating table and inserting initial records ==")
+    val initialDF = Seq(
+      (1, "Alice", 1000, 10),  // (id, name, ts, price)
+      (2, "Bob",   1000, 20)
+    ).toDF("id", "name", "ts", "price")
+
+    initialDF.write.format("hudi")
+      .options(hudiOptions)
+      .mode("overwrite")
+      .save(basePath)
+
+    // ------------------------------------------------------------------
+    // Step 3: Update the records (generating a new log file with updated 
stats)
+    // ------------------------------------------------------------------
+    println("== Step 3: Updating records ==")
+    val updateDF = Seq(
+      (1, "Alice", 2000, 15),  // update Alice
+      (2, "Bob",   2000, 25)   // update Bob
+    ).toDF("id", "name", "ts", "price")
+
+    updateDF.write.format("hudi")
+      .options(hudiOptions)
+      .mode("append")
+      .save(basePath)
+
+    // ------------------------------------------------------------------
+    // Step 4: Trigger compaction so that a new base file is produced.
+    // For inline compaction, we set inline compaction options.
+    // ------------------------------------------------------------------
+    val compactionOptions = hudiOptions ++ Map(
+      HoodieCompactionConfig.INLINE_COMPACT.key -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
+    )
+    // Writing the same updateDF triggers inline compaction in a MERGE_ON_READ 
table.
+    updateDF.write.format("hudi")
+      .options(compactionOptions)
+      .mode("append")
+      .save(basePath)
+
+    // ------------------------------------------------------------------
+    // Step 5: Perform a clean operation.
+    // The clean removes older log files for column stats, leaving behind 
delete blocks.
+    // ------------------------------------------------------------------
+    val cleanOptions = hudiOptions ++ Map(
+      HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key -> "1"
+    )
+    val client = DataSourceUtils.createHoodieClient(
+      spark.sparkContext, "", basePath, tableName, cleanOptions.asJava
+    ).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+    // Trigger cleaning via an additional write (which invokes cleaning)
+    val cleanInstant = client.scheduleTableService(Option.empty(), 
TableServiceType.CLEAN)
+    client.clean(cleanInstant.get())
+    client.close()
+
+    // ------------------------------------------------------------------
+    // Step 6: Downgrade to version 6.
+    // ------------------------------------------------------------------
+    val downgradedOptions = hudiOptions ++ Map(
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> 
HoodieTableVersion.SIX.versionCode().toString
+    )
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    new UpgradeDowngrade(metaClient, getWriteConfig(downgradedOptions, 
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.SIX, null)
+    // assert that metadata table version is 6 and it has been compacted
+    var metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getStorageConf).setBasePath(basePath
 + "/.hoodie/metadata").build()
+    assertEquals(HoodieTableVersion.SIX, 
metadataMetaClient.getTableConfig.getTableVersion)
+    assertEquals(1, 
metadataMetaClient.getActiveTimeline.getCommitTimeline.getInstants.size())
+    val compactionInstant = 
metadataMetaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant().get()
+
+    // ------------------------------------------------------------------
+    // Step 7: Read the table using a predicate on 'price' to force the column 
stats read path.
+    // This should trigger the error during deserialization of the delete 
block.
+    // ------------------------------------------------------------------
+    val df = spark.read.format("hudi").load(basePath).filter("price > 15")
+    // assert the result
+    assertEquals(1, df.count())
+    // assert that only id=2 is present
+    assertEquals(1, df.filter("id = 2").count())
+
+    // ------------------------------------------------------------------
+    // Step 8: Do a commit with table version 6 but this time trigger MDT 
compaction.
+    // ------------------------------------------------------------------
+    val writeConfigsPostDowngrade = hudiOptions ++ Map(
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> 
HoodieTableVersion.SIX.versionCode().toString
+    )
+    val updateDF2 = Seq(
+      (1, "Alice", 3000, 20),  // update Alice
+      (2, "Bob",   3000, 30)   // update Bob
+    ).toDF("id", "name", "ts", "price")
+    updateDF2.write.format("hudi")
+      .options(writeConfigsPostDowngrade)
+      .mode("append")
+      .save(basePath)
+    // assert that metadata table has been compacted
+    metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient)
+    val lastCompactionInstant = 
metadataMetaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant().get()
+    assertTrue(compareTimestamps(lastCompactionInstant.requestedTime(), 
GREATER_THAN_OR_EQUALS, compactionInstant.requestedTime()))
+  }
+
+  @Test
+  def testUpgradeDowngradeWithRecordIndex() : Unit = {

Review Comment:
   don't we have any other tests that aleady exists for upgrade, downgrade 
scenario is my question (even if its not the new test you have written). we 
should have existing tests right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to