shangxinli commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3250603466
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,32 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
- Option.of(writeStatusValidator));
+ // Cache the RDD if not already persisted, so both validators (collect)
and
+ // writeClient.commit() share the same materialized result without
re-evaluation.
+ // shouldUnpersist is true when we created the cache here (storage level
was NONE),
+ // so the finally block knows to release it.
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ if (shouldUnpersist) {
+ writeStatusRDD.cache();
Review Comment:
Filed #18750 to track the `writeStatusValidator` migration as a follow-up.
Will pick it up after this PR lands.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]