rahil-c commented on code in PR #14001:
URL: https://github.com/apache/hudi/pull/14001#discussion_r2386697121
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -114,15 +129,65 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
+ option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "3").
+ option(HoodieCleanConfig.AUTO_CLEAN.key(), "false").
+ option(HoodieArchivalConfig.AUTO_ARCHIVE.key(), "true").
+ option(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.key(), "1").
+ option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2").
+ option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3").
+ option(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true").
+ option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2").
+ option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
"512000").
+ option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
"512000").
options(opts).
mode(SaveMode.Append).
save(basePath)
// Validate table version.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
val firstUpdateInstantTime =
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+ // 2.5. Add mixed ordering test data to validate proper ordering handling
Review Comment:
I have added the swap but not sure what you meant by the validation being
changed? The records currently get deleted as part of the flow during this
operation
```
// 5. Add a delete.
val fourthUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
(12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
val fourthUpdate = spark.createDataFrame(fourthUpdateData).toDF(columns:
_*)
fourthUpdate.write.format("hudi").
option(OPERATION.key(), "delete").
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
options(serviceOpts).
mode(SaveMode.Append).
save(basePath)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]