boneanxs commented on code in PR #9472:
URL: https://github.com/apache/hudi/pull/9472#discussion_r1298256125


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ReplaceCommitValidateUtil.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.LOG;
+
+public class ReplaceCommitValidateUtil {
+  public static void validateReplaceCommit(HoodieTableMetaClient metaClient) {
+    metaClient.reloadActiveTimeline();
+    Set<String> replaceFileids = new HashSet<>();
+
+    // Verify pending and completed replace commit
+    
Stream.concat(metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream(),
+        
metaClient.getActiveTimeline().filterInflights().getInstants().stream()).parallel().map(instant
 -> {

Review Comment:
   Only need inflight replace commits?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ReplaceCommitValidateUtil.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.LOG;
+
+public class ReplaceCommitValidateUtil {
+  public static void validateReplaceCommit(HoodieTableMetaClient metaClient) {
+    metaClient.reloadActiveTimeline();
+    Set<String> replaceFileids = new HashSet<>();
+
+    // Verify pending and completed replace commit
+    
Stream.concat(metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream(),
+        
metaClient.getActiveTimeline().filterInflights().getInstants().stream()).parallel().map(instant
 -> {
+      try {

Review Comment:
   Understand the intension that deserialize commit might be costly, but I 
prefer sequential stream instead of parallel. the parallelism is out of control 
and usually the merging is costly.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -238,10 +248,23 @@ protected abstract Iterator<List<WriteStatus>> 
handleUpdate(String partitionPath
 
   protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
executeClustering(HoodieClusteringPlan clusteringPlan) {

Review Comment:
   Looks this is only fixed inside clustering/delete, is it possible 2 
concurrent `insert overwrite` queries also has such issue?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ReplaceCommitValidateUtil.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.LOG;
+
+public class ReplaceCommitValidateUtil {
+  public static void validateReplaceCommit(HoodieTableMetaClient metaClient) {
+    metaClient.reloadActiveTimeline();
+    Set<String> replaceFileids = new HashSet<>();
+
+    // Verify pending and completed replace commit
+    
Stream.concat(metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream(),
+        
metaClient.getActiveTimeline().filterInflights().getInstants().stream()).parallel().map(instant
 -> {
+      try {
+        HoodieReplaceCommitMetadata replaceCommitMetadata =
+            
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
+                HoodieReplaceCommitMetadata.class);
+        if (!instant.isCompleted()) {
+          return 
JsonUtils.getObjectMapper().readValue(replaceCommitMetadata.getExtraMetadata().getOrDefault("replaceCommitFileIds",
 ""), String[].class);
+        } else {
+          return 
replaceCommitMetadata.getPartitionToReplaceFileIds().values().stream()
+              .flatMap(List::stream)
+              .toArray(String[]::new);
+        }
+      } catch (IOException e) {

Review Comment:
   We should throw the error out also?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ReplaceCommitValidateUtil.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.LOG;
+
+public class ReplaceCommitValidateUtil {
+  public static void validateReplaceCommit(HoodieTableMetaClient metaClient) {
+    metaClient.reloadActiveTimeline();
+    Set<String> replaceFileids = new HashSet<>();
+
+    // Verify pending and completed replace commit
+    
Stream.concat(metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream(),
+        
metaClient.getActiveTimeline().filterInflights().getInstants().stream()).parallel().map(instant
 -> {
+      try {
+        HoodieReplaceCommitMetadata replaceCommitMetadata =
+            
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
+                HoodieReplaceCommitMetadata.class);
+        if (!instant.isCompleted()) {
+          return 
JsonUtils.getObjectMapper().readValue(replaceCommitMetadata.getExtraMetadata().getOrDefault("replaceCommitFileIds",
 ""), String[].class);
+        } else {

Review Comment:
   Better make this "replaceCommitFileIds" a static constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to