nsivabalan commented on code in PR #14001:
URL: https://github.com/apache/hudi/pull/14001#discussion_r2393589764


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -479,7 +814,13 @@ object TestPayloadDeprecationFlow {
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,

Review Comment:
   legacy payload config should have been part of table config. can we double 
check. 
   if not, downgrade will fail. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -108,46 +122,57 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
       (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
-    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
-    firstUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
-      option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
-      options(opts).
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(firstUpdateData, columns, serviceOpts, opts, basePath,
+      tableVersion = Some("8"), orderingFields = Some(originalOrderingFields))
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
     val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
-    // 3. Add an update. This is expected to trigger the upgrade
+    // 3. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has 
ts=10 originally)

Review Comment:
   can we do lower ordering for a diff rider. 
   and then we should have different expected results based on whether its 
commit_time based merge mode or event_time based merge mode. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -278,21 +337,36 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
       (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
-    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
-    firstUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(firstUpdateData, columns, serviceOpts, Map.empty, basePath)
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
     // validate ordering fields
     assertEquals(expectedOrderingFields, 
metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
     val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
+    // 3. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has 
ts=10 originally)
+      (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),

Review Comment:
   same comment as above



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -622,7 +1043,13 @@ object TestPayloadDeprecationFlow {
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
-        )
+        ),
+        Map(

Review Comment:
   can we split this into two sets of expectedConfigs. 
   one set for v8 and another set for v9 (i.e. after upgrade) 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -108,46 +122,57 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
       (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
-    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
-    firstUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
-      option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
-      options(opts).
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(firstUpdateData, columns, serviceOpts, opts, basePath,
+      tableVersion = Some("8"), orderingFields = Some(originalOrderingFields))
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
     val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
-    // 3. Add an update. This is expected to trigger the upgrade
+    // 3. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has 
ts=10 originally)

Review Comment:
   I see that we are also setting ordering field config for 
OverwriteWithLatestAvroPayload. 
   can we unset that? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to