xushiyan commented on code in PR #6561:
URL: https://github.com/apache/hudi/pull/6561#discussion_r968958663
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -356,6 +360,16 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>>
cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata =
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ if (clusteringMetadata.getWriteStatuses().isEmpty()) {
Review Comment:
to fit the validation in a better method hook within the write path, should
it be done within `completeClustering()` ? we also want to "cluster" the logic
within reasonable method hooks.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>>
executeClustering(HoodieC
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList,
writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan,
writeMetadata));
- validateWriteResult(clusteringPlan, writeMetadata);
+ // if we don't cache the write statuses above, validation will call
isEmpty which might retrigger the execution again.
Review Comment:
if we remove this call, then the `validateWriteResult()` is not used any
more. can we clean it up? i saw a similar code path in
`ClusteringCommitSink()`. we need to address it in flink clustering path too
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>>
executeClustering(HoodieC
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList,
writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan,
writeMetadata));
- validateWriteResult(clusteringPlan, writeMetadata);
+ // if we don't cache the write statuses above, validation will call
isEmpty which might retrigger the execution again.
Review Comment:
this comment is confusing: so do we cache or not cache?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]