This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 90c429eb9dc5b0077727d13b3adbb48d072c2596 Author: Tim Brown <[email protected]> AuthorDate: Fri Apr 26 01:27:50 2024 -0700 [HUDI-7575] Avoid repeated fetching of pending replace instants (#10976) --- .../table/timeline/HoodieDefaultTimeline.java | 32 ++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 737ec0ca5d9..68cf428d364 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -61,6 +61,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline { private List<HoodieInstant> instants; // for efficient #contains queries. private transient volatile Set<String> instantTimeSet; + // for efficient #isPendingClusterInstant queries + private transient volatile Set<String> pendingReplaceClusteringInstants; // for efficient #isBeforeTimelineStarts check. private transient volatile Option<HoodieInstant> firstNonSavepointCommit; private String timelineHash; @@ -527,14 +529,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public boolean isPendingClusterInstant(String instantTime) { - HoodieTimeline potentialTimeline = getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> i.getTimestamp().equals(instantTime)); - if (potentialTimeline.countInstants() == 0) { - return false; - } - if (potentialTimeline.countInstants() > 1) { - throw new IllegalStateException("Multiple instants with same timestamp: " + potentialTimeline); - } - return ClusteringUtils.isClusteringInstant(this, potentialTimeline.firstInstant().get()); + return getOrCreatePendingClusteringInstantSet().contains(instantTime); } @Override @@ -578,6 +573,27 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return this.instantTimeSet; } + private Set<String> getOrCreatePendingClusteringInstantSet() { + if (this.pendingReplaceClusteringInstants == null) { + synchronized (this) { + if (this.pendingReplaceClusteringInstants == null) { + List<HoodieInstant> pendingReplaceInstants = getCommitsTimeline().filterPendingReplaceTimeline().getInstants(); + // Validate that there are no instants with same timestamp + pendingReplaceInstants.stream().collect(Collectors.groupingBy(HoodieInstant::getTimestamp)).forEach((timestamp, instants) -> { + if (instants.size() > 1) { + throw new IllegalStateException("Multiple instants with same timestamp: " + timestamp + " instants: " + instants); + } + }); + // Filter replace commits down to those that are due to clustering + this.pendingReplaceClusteringInstants = pendingReplaceInstants.stream() + .filter(instant -> ClusteringUtils.isClusteringInstant(this, instant)) + .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + } + } + } + return this.pendingReplaceClusteringInstants; + } + /** * Returns the first non savepoint commit on the timeline. */
