linliu-code commented on code in PR #13738:
URL: https://github.com/apache/hudi/pull/13738#discussion_r2292116354


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -168,6 +184,162 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("providePayloadClassTestCases"))
+  def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
+                                                    payloadClazz: String,
+                                                    expectedConfigs: 
Map[String, String]): Unit = {
+    val opts: Map[String, String] = Map(
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+      HoodieMetadataConfig.ENABLE.key() -> "false")
+    val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq",
+      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+    // 1. Add an insert.
+    val data = Seq(
+      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
+      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
+      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
+      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    val orderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_bin_file,_event_pos"
+    } else {
+      "ts"
+    }
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "_event_lsn").
+      option(ORDERING_FIELDS.key(), orderingFields).
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+    // Verify table was created successfully
+    var metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    var tableConfig = metaClient.getTableConfig
+    // Verify table version is 9
+    assertEquals(9, tableConfig.getTableVersion.versionCode())
+    assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
+    // Verify table properties
+    expectedConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, tableConfig.getString(key), s"Config $key 
should be $expectedValue")
+      } else {
+        assertFalse(tableConfig.contains(key), s"Config $key should not be 
present")
+      }
+    }
+
+    // 2. Add an update.
+    val firstUpdateData = Seq(
+      (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+      (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
+      (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
+    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+    firstUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      mode(SaveMode.Append).
+      save(basePath)
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+    val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+
+
+    // 3. Add an update. This is expected to trigger the upgrade
+    val compactionEnabled = if 
(tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      "true"
+    } else {
+      "false"
+    }
+    val secondUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+      (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
+      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      mode(SaveMode.Append).
+      save(basePath)
+    // Validate table version as 9.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+    assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
+    val compactionInstants = 
metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
+    val foundCompaction = compactionInstants.stream().anyMatch(i => 
i.getAction.equals("commit"))
+    assertTrue(foundCompaction)
+
+    // 4. Add a trivial update to trigger payload class mismatch.
+    val thirdUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1))
+    val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      assertThrows[HoodieException] {
+        thirdUpdate.write.format("hudi").
+          option(OPERATION.key(), "upsert").
+          option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+          
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+          option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+            classOf[MySqlDebeziumAvroPayload].getName).
+          mode(SaveMode.Append).
+          save(basePath)
+      }
+    }
+
+    // 5. Add a delete.
+    val fourthUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),

Review Comment:
   We do. AwsDmsAvroPayload's result is different from other payloads since it 
uses the OP delete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to