yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1975943586
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
try {
- content = timeline.getInstantDetails(requestedInstant);
+ // First assume the instant file is not empty and parse it.
+ return getHoodieRequestedReplaceMetadataOption(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant);
+ } catch (Exception ex) {
+ // If anything goes wrong, check if this is empty file.
+ if (isEmptyReplaceOrClusteringInstant(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+ return Option.empty();
+ }
+ // If still no luck, throw the exception.
+ throw ex;
+ }
+ }
+
+ private static Option<HoodieRequestedReplaceMetadata>
getHoodieRequestedReplaceMetadataOption(
Review Comment:
```suggestion
private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadataOption(
```
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
try {
- content = timeline.getInstantDetails(requestedInstant);
+ // First assume the instant file is not empty and parse it.
+ return getHoodieRequestedReplaceMetadataOption(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant);
+ } catch (Exception ex) {
+ // If anything goes wrong, check if this is empty file.
+ if (isEmptyReplaceOrClusteringInstant(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+ return Option.empty();
+ }
+ // If still no luck, throw the exception.
+ throw ex;
+ }
+ }
+
+ private static Option<HoodieRequestedReplaceMetadata>
getHoodieRequestedReplaceMetadataOption(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+ try {
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
} catch (HoodieIOException e) {
if (e.getCause() instanceof FileNotFoundException &&
pendingReplaceOrClusterInstant.isCompleted()) {
// For clustering instants, completed instant is also a replace commit
instant. For input replace commit instant,
// it is not known whether requested instant is CLUSTER or
REPLACE_COMMIT_ACTION. So we need to query both.
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION,
pendingReplaceOrClusterInstant.requestedTime());
- content = timeline.getInstantDetails(requestedInstant);
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
}
+ throw e;
}
- if (!content.isPresent() || content.get().length == 0) {
- // few operations create requested file without any content. Assume
these are not clustering
- return Option.empty();
+ }
+
+ private static boolean isEmptyReplaceOrClusteringInstant(
Review Comment:
Similarly, we should get rid of such util methods and unify them into the
`HoodieTimeline` class or another so the code is easier to maintain. For this
PR it's OK to have it. Let's create a JIRA ticket to track the refactoring and
clean-up.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
try {
- content = timeline.getInstantDetails(requestedInstant);
+ // First assume the instant file is not empty and parse it.
+ return getHoodieRequestedReplaceMetadataOption(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant);
+ } catch (Exception ex) {
+ // If anything goes wrong, check if this is empty file.
+ if (isEmptyReplaceOrClusteringInstant(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+ return Option.empty();
+ }
+ // If still no luck, throw the exception.
+ throw ex;
+ }
+ }
+
+ private static Option<HoodieRequestedReplaceMetadata>
getHoodieRequestedReplaceMetadataOption(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+ try {
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
} catch (HoodieIOException e) {
if (e.getCause() instanceof FileNotFoundException &&
pendingReplaceOrClusterInstant.isCompleted()) {
// For clustering instants, completed instant is also a replace commit
instant. For input replace commit instant,
// it is not known whether requested instant is CLUSTER or
REPLACE_COMMIT_ACTION. So we need to query both.
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION,
pendingReplaceOrClusterInstant.requestedTime());
- content = timeline.getInstantDetails(requestedInstant);
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
}
+ throw e;
}
- if (!content.isPresent() || content.get().length == 0) {
- // few operations create requested file without any content. Assume
these are not clustering
- return Option.empty();
+ }
+
+ private static boolean isEmptyReplaceOrClusteringInstant(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator factory, HoodieInstant requestedInstant) {
Review Comment:
```suggestion
HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator instantGenerator, HoodieInstant requestedInstant) {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
try {
- content = timeline.getInstantDetails(requestedInstant);
+ // First assume the instant file is not empty and parse it.
+ return getHoodieRequestedReplaceMetadataOption(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant);
+ } catch (Exception ex) {
+ // If anything goes wrong, check if this is empty file.
+ if (isEmptyReplaceOrClusteringInstant(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+ return Option.empty();
+ }
+ // If still no luck, throw the exception.
+ throw ex;
+ }
+ }
+
+ private static Option<HoodieRequestedReplaceMetadata>
getHoodieRequestedReplaceMetadataOption(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
+ try {
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
} catch (HoodieIOException e) {
if (e.getCause() instanceof FileNotFoundException &&
pendingReplaceOrClusterInstant.isCompleted()) {
// For clustering instants, completed instant is also a replace commit
instant. For input replace commit instant,
// it is not known whether requested instant is CLUSTER or
REPLACE_COMMIT_ACTION. So we need to query both.
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION,
pendingReplaceOrClusterInstant.requestedTime());
- content = timeline.getInstantDetails(requestedInstant);
+ return
Option.of(timeline.loadRequestedReplaceMetadata(requestedInstant));
}
+ throw e;
}
- if (!content.isPresent() || content.get().length == 0) {
- // few operations create requested file without any content. Assume
these are not clustering
- return Option.empty();
+ }
+
+ private static boolean isEmptyReplaceOrClusteringInstant(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant,
InstantGenerator factory, HoodieInstant requestedInstant) {
+ try {
+ return timeline.isEmpty(requestedInstant);
+ } catch (HoodieIOException e) {
+ if (e.getCause() instanceof FileNotFoundException &&
pendingReplaceOrClusterInstant.isCompleted()) {
+ // For clustering instants, completed instant is also a replace commit
instant. For input replace commit instant,
+ // it is not known whether requested instant is CLUSTER or
REPLACE_COMMIT_ACTION. So we need to query both.
+ requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION,
pendingReplaceOrClusterInstant.requestedTime());
Review Comment:
We should use `getClusteringCommitRequestedInstant` for clustering here
because the file name of requested instant of clustering is different from
replacecommit for INSERT_OVERWRITE operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]