linliu-code commented on code in PR #13738:
URL: https://github.com/apache/hudi/pull/13738#discussion_r2292116354
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -168,6 +184,162 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
&& timeTravelDf.except(expectedTimeTravelDf).isEmpty)
}
+ @ParameterizedTest
+ @MethodSource(Array("providePayloadClassTestCases"))
+ def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
+ payloadClazz: String,
+ expectedConfigs:
Map[String, String]): Unit = {
+ val opts: Map[String, String] = Map(
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+ HoodieMetadataConfig.ENABLE.key() -> "false")
+ val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq",
+ DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
+ // 1. Add an insert.
+ val data = Seq(
+ (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
+ (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
+ (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
+ (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
+ (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ val orderingFields = if
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ "_event_bin_file,_event_pos"
+ } else {
+ "ts"
+ }
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "_event_lsn").
+ option(ORDERING_FIELDS.key(), orderingFields).
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+ // Verify table was created successfully
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ var tableConfig = metaClient.getTableConfig
+ // Verify table version is 9
+ assertEquals(9, tableConfig.getTableVersion.versionCode())
+ assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
+ // Verify table properties
+ expectedConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, tableConfig.getString(key), s"Config $key
should be $expectedValue")
+ } else {
+ assertFalse(tableConfig.contains(key), s"Config $key should not be
present")
+ }
+ }
+
+ // 2. Add an update.
+ val firstUpdateData = Seq(
+ (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+ (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
+ (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
+ val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+ firstUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ mode(SaveMode.Append).
+ save(basePath)
+ // Validate table version.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+ val firstUpdateInstantTime =
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+
+
+ // 3. Add an update. This is expected to trigger the upgrade
+ val compactionEnabled = if
(tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ "true"
+ } else {
+ "false"
+ }
+ val secondUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+ (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
+ (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
+ secondUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
+ option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
+ mode(SaveMode.Append).
+ save(basePath)
+ // Validate table version as 9.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+ assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
+ val compactionInstants =
metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
+ val foundCompaction = compactionInstants.stream().anyMatch(i =>
i.getAction.equals("commit"))
+ assertTrue(foundCompaction)
+
+ // 4. Add a trivial update to trigger payload class mismatch.
+ val thirdUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1))
+ val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ assertThrows[HoodieException] {
+ thirdUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+ option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+ classOf[MySqlDebeziumAvroPayload].getName).
+ mode(SaveMode.Append).
+ save(basePath)
+ }
+ }
+
+ // 5. Add a delete.
+ val fourthUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
Review Comment:
We do. AwsDmsAvroPayload's result is different from other payloads since it
uses the OP delete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]