xushiyan commented on code in PR #6561:
URL: https://github.com/apache/hudi/pull/6561#discussion_r968958663


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -356,6 +360,16 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
cluster(String clusteringInstan
     LOG.info("Starting clustering at " + clusteringInstant);
     HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = 
table.cluster(context, clusteringInstant);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = 
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+    if (clusteringMetadata.getWriteStatuses().isEmpty()) {

Review Comment:
   to fit the validation in a better method hook within the write path, should 
it be done within `completeClustering()` ? we also want to "cluster" the logic 
within reasonable method hooks.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
executeClustering(HoodieC
     HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, 
writeMetadata);
     
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
     
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan,
 writeMetadata));
-    validateWriteResult(clusteringPlan, writeMetadata);
+    // if we don't cache the write statuses above, validation will call 
isEmpty which might retrigger the execution again.

Review Comment:
   if we remove this call, then the `validateWriteResult()` is not used any 
more. can we clean it up? i saw a similar code path in 
`ClusteringCommitSink()`. we need to address it in flink clustering path too



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
executeClustering(HoodieC
     HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, 
writeMetadata);
     
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
     
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan,
 writeMetadata));
-    validateWriteResult(clusteringPlan, writeMetadata);
+    // if we don't cache the write statuses above, validation will call 
isEmpty which might retrigger the execution again.

Review Comment:
   this comment is confusing: so do we cache or not cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to