nsivabalan commented on code in PR #13685:
URL: https://github.com/apache/hudi/pull/13685#discussion_r2279998688


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -110,6 +111,9 @@ private void updateMergeRelatedConfigs(Map<ConfigProperty, 
String> propertiesToA
         propertiesToRemove.add(
             ConfigProperty.key(MERGE_CUSTOM_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER).noDefaultValue());
       }
+      if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
+        propertiesToRemove.add(HoodieTableConfig.PRECOMBINE_FIELDS);

Review Comment:
   this is not right . 
   isn't the `_event_seq` the ordering field in V8? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java:
##########
@@ -250,6 +250,7 @@ public void run(HoodieTableVersion toVersion, String 
instantTime) {
     // Remove properties.
     Set<String> propertiesToRemove =
         
tablePropsToRemove.stream().map(ConfigProperty::key).collect(Collectors.toSet());
+    metaClient.getTableConfig().setTableVersion(toVersion);

Review Comment:
   once we rebase w/ latest master, this should not be required



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala:
##########
@@ -109,6 +113,91 @@ class TestEightToNineUpgrade extends 
RecordLevelIndexTestBase {
     checkResultForVersion8(payloadClass)
   }
 
+  @Test
+  def testUpgradeDowngradeMySqlDebeziumPayload(): Unit = {
+    val payloadClass = classOf[MySqlDebeziumAvroPayload].getName
+    var opts: Map[String, String] = Map(
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClass,
+      HoodieMetadataConfig.ENABLE.key() -> "false"
+    )
+    val columns = Seq("ts", "key", "rider", "driver", 
DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME,
+      DebeziumConstants.ADDED_SEQ_COL_NAME)
+
+    // 1. Add an insert.
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 1, 1, "1.1"),
+      (10, "2", "rider-B", "driver-B", 2, 5, "2.5"),
+      (10, "3", "rider-C", "driver-C", 3, 10, "3.10"),
+      (10, "4", "rider-D", "driver-D", 4, 8, "4.8"),
+      (10, "5", "rider-E", "driver-E", 5, 4, "5.4"))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+    checkResultForVersion8(payloadClass)
+
+    // 2. Add an update and upgrade the table to v9
+    // first two records with larger ordering values based on debezium payload
+    // last two records with smaller ordering values based on debezium 
payload, below updates should be rejected
+    var updateData = Seq(
+      (9, "1", "rider-X", "driver-X", 1, 2, "1.2"),
+      (9, "2", "rider-Y", "driver-Y", 3, 2, "3.2"),
+      (9, "3", "rider-C", "driver-C", 2, 10, "2.10"),
+      (9, "4", "rider-D", "driver-D", 4, 7, "4.7")
+    )
+    var update = spark.createDataFrame(updateData).toDF(columns: _*)
+    update.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      mode(SaveMode.Append).
+      save(basePath)
+    checkResultForVersion9("", payloadClass)

Review Comment:
   can we add data validation post upgrade. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to