yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1975943586


##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
     try {
-      content = timeline.getInstantDetails(requestedInstant);
+      // First assume the instant file is not empty and parse it.
+      return getHoodieRequestedReplaceMetadataOption(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant);
+    } catch (Exception ex) {
+      // If anything goes wrong, check if this is empty file.
+      if (isEmptyReplaceOrClusteringInstant(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+        return Option.empty();
+      }
+      // If still no luck, throw the exception.
+      throw ex;
+    }
+  }
+
+  private static Option<HoodieRequestedReplaceMetadata> 
getHoodieRequestedReplaceMetadataOption(

Review Comment:
   ```suggestion
     private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadataOption(
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
     try {
-      content = timeline.getInstantDetails(requestedInstant);
+      // First assume the instant file is not empty and parse it.
+      return getHoodieRequestedReplaceMetadataOption(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant);
+    } catch (Exception ex) {
+      // If anything goes wrong, check if this is empty file.
+      if (isEmptyReplaceOrClusteringInstant(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+        return Option.empty();
+      }
+      // If still no luck, throw the exception.
+      throw ex;
+    }
+  }
+
+  private static Option<HoodieRequestedReplaceMetadata> 
getHoodieRequestedReplaceMetadataOption(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+    try {
+      return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
     } catch (HoodieIOException e) {
       if (e.getCause() instanceof FileNotFoundException && 
pendingReplaceOrClusterInstant.isCompleted()) {
         // For clustering instants, completed instant is also a replace commit 
instant. For input replace commit instant,
         // it is not known whether requested instant is CLUSTER or 
REPLACE_COMMIT_ACTION. So we need to query both.
         requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, 
pendingReplaceOrClusterInstant.requestedTime());
-        content = timeline.getInstantDetails(requestedInstant);
+        return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
       }
+      throw e;
     }
-    if (!content.isPresent() || content.get().length == 0) {
-      // few operations create requested file without any content. Assume 
these are not clustering
-      return Option.empty();
+  }
+
+  private static boolean isEmptyReplaceOrClusteringInstant(

Review Comment:
   Similarly, we should get rid of such util methods and unify them into the 
`HoodieTimeline` class or another so the code is easier to maintain.  For this 
PR it's OK to have it.  Let's create a JIRA ticket to track the refactoring and 
clean-up.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
     try {
-      content = timeline.getInstantDetails(requestedInstant);
+      // First assume the instant file is not empty and parse it.
+      return getHoodieRequestedReplaceMetadataOption(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant);
+    } catch (Exception ex) {
+      // If anything goes wrong, check if this is empty file.
+      if (isEmptyReplaceOrClusteringInstant(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+        return Option.empty();
+      }
+      // If still no luck, throw the exception.
+      throw ex;
+    }
+  }
+
+  private static Option<HoodieRequestedReplaceMetadata> 
getHoodieRequestedReplaceMetadataOption(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+    try {
+      return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
     } catch (HoodieIOException e) {
       if (e.getCause() instanceof FileNotFoundException && 
pendingReplaceOrClusterInstant.isCompleted()) {
         // For clustering instants, completed instant is also a replace commit 
instant. For input replace commit instant,
         // it is not known whether requested instant is CLUSTER or 
REPLACE_COMMIT_ACTION. So we need to query both.
         requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, 
pendingReplaceOrClusterInstant.requestedTime());
-        content = timeline.getInstantDetails(requestedInstant);
+        return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
       }
+      throw e;
     }
-    if (!content.isPresent() || content.get().length == 0) {
-      // few operations create requested file without any content. Assume 
these are not clustering
-      return Option.empty();
+  }
+
+  private static boolean isEmptyReplaceOrClusteringInstant(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator factory, HoodieInstant requestedInstant) {

Review Comment:
   ```suggestion
         HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator instantGenerator, HoodieInstant requestedInstant) {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
     try {
-      content = timeline.getInstantDetails(requestedInstant);
+      // First assume the instant file is not empty and parse it.
+      return getHoodieRequestedReplaceMetadataOption(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant);
+    } catch (Exception ex) {
+      // If anything goes wrong, check if this is empty file.
+      if (isEmptyReplaceOrClusteringInstant(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+        return Option.empty();
+      }
+      // If still no luck, throw the exception.
+      throw ex;
+    }
+  }
+
+  private static Option<HoodieRequestedReplaceMetadata> 
getHoodieRequestedReplaceMetadataOption(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+    try {
+      return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
     } catch (HoodieIOException e) {
       if (e.getCause() instanceof FileNotFoundException && 
pendingReplaceOrClusterInstant.isCompleted()) {
         // For clustering instants, completed instant is also a replace commit 
instant. For input replace commit instant,
         // it is not known whether requested instant is CLUSTER or 
REPLACE_COMMIT_ACTION. So we need to query both.
         requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, 
pendingReplaceOrClusterInstant.requestedTime());
-        content = timeline.getInstantDetails(requestedInstant);
+        return 
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
       }
+      throw e;
     }
-    if (!content.isPresent() || content.get().length == 0) {
-      // few operations create requested file without any content. Assume 
these are not clustering
-      return Option.empty();
+  }
+
+  private static boolean isEmptyReplaceOrClusteringInstant(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, 
InstantGenerator factory, HoodieInstant requestedInstant) {
+    try {
+      return timeline.isEmpty(requestedInstant);
+    } catch (HoodieIOException e) {
+      if (e.getCause() instanceof FileNotFoundException && 
pendingReplaceOrClusterInstant.isCompleted()) {
+        // For clustering instants, completed instant is also a replace commit 
instant. For input replace commit instant,
+        // it is not known whether requested instant is CLUSTER or 
REPLACE_COMMIT_ACTION. So we need to query both.
+        requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, 
pendingReplaceOrClusterInstant.requestedTime());

Review Comment:
   We should use `getClusteringCommitRequestedInstant` for clustering here 
because the file name of requested instant of clustering is different from 
replacecommit for INSERT_OVERWRITE operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to