nsivabalan commented on code in PR #14001:
URL: https://github.com/apache/hudi/pull/14001#discussion_r2389998425


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -173,10 +244,52 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       option(OPERATION.key(), "delete").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "3").
+      option(HoodieCleanConfig.AUTO_CLEAN.key(), "false").
+      option(HoodieArchivalConfig.AUTO_ARCHIVE.key(), "true").
+      option(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.key(), "1").
+      option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2").
+      option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3").
+      option(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true").
+      option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2").
+      option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), 
"512000").
+      option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(), 
"512000").
       mode(SaveMode.Append).
       save(basePath)
 
-    // 6. Validate.
+    // 6. Add INSERT operation.
+    val insertData = Seq(
+      (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+      (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+    val insertDataFrame = spark.createDataFrame(insertData).toDF(columns: _*)
+    insertDataFrame.write.format("hudi").
+      option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL).
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "3").
+      option(HoodieCleanConfig.AUTO_CLEAN.key(), "false").
+      option(HoodieArchivalConfig.AUTO_ARCHIVE.key(), "true").
+      option(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.key(), "1").
+      option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2").
+      option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3").
+      option(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true").
+      option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2").
+      option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), 
"512000").
+      option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(), 
"512000").
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // Final validation of table management operations after all writes
+    metaClient = HoodieTableMetaClient.builder()

Review Comment:
   Fixing it as part of https://github.com/apache/hudi/pull/13987/ 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -173,10 +228,34 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       option(OPERATION.key(), "delete").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      options(serviceOpts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // 6. Add INSERT operation.
+    val insertData = Seq(
+      (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+      (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+    val insertDataFrame = spark.createDataFrame(insertData).toDF(columns: _*)
+    insertDataFrame.write.format("hudi").

Review Comment:
   again, we are writing very fat methods. 
   can we try to add private methods and use them here. 
   
   for eg. 
   ```
   df.write.format("hudi").option(OPERATION.key(), writeOperation).
         option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
         options(serviceOpts).
         mode(SaveMode.Append).
         save(basePath)
   
   ```
   
   if we move above code snippet to a private method, this main method would 
become very light. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -59,6 +61,20 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
       HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
 
+    // Common table service configurations
+    val serviceOpts: Map[String, String] = Map(
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "3",
+      HoodieCleanConfig.AUTO_CLEAN.key() -> "false",

Review Comment:
   why disable clean? are we explicitly trigging clean then?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -114,15 +134,47 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
       option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
+      options(serviceOpts).
       options(opts).
       mode(SaveMode.Append).
       save(basePath)
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
     val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
+    // 2.5. Add mixed ordering test data to validate proper ordering handling

Review Comment:
   what is `2.5`. 
   why can't we use `3` here and fix next set to bullets 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -282,15 +429,43 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     firstUpdate.write.format("hudi").
       option(OPERATION.key(), "upsert").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(serviceOpts).
       mode(SaveMode.Append).
       save(basePath)
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
     // validate ordering fields
     assertEquals(expectedOrderingFields, 
metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
     val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
+    // 2.5. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has 
ts=10 originally)
+      (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),
+      // Delete rider-E with LOWER ordering - should be IGNORED (rider-E has 
ts=10 originally)
+      (9, 5L, "rider-EE", "driver-EE", 17.85, "D", "9.1", 9, 1, "d"))
+    val mixedOrderingUpdate = 
spark.createDataFrame(mixedOrderingData).toDF(columns: _*)
+    mixedOrderingUpdate.write.format("hudi").

Review Comment:
   same comment as above. 
   lets try to reuse code as much as possible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to