This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 90c429eb9dc5b0077727d13b3adbb48d072c2596
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 26 01:27:50 2024 -0700

    [HUDI-7575] Avoid repeated fetching of pending replace instants (#10976)
---
 .../table/timeline/HoodieDefaultTimeline.java      | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 737ec0ca5d9..68cf428d364 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -61,6 +61,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
   private List<HoodieInstant> instants;
   // for efficient #contains queries.
   private transient volatile Set<String> instantTimeSet;
+  // for efficient #isPendingClusterInstant queries
+  private transient volatile Set<String> pendingReplaceClusteringInstants;
   // for efficient #isBeforeTimelineStarts check.
   private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
   private String timelineHash;
@@ -527,14 +529,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   
   @Override
   public boolean isPendingClusterInstant(String instantTime) {
-    HoodieTimeline potentialTimeline = 
getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> 
i.getTimestamp().equals(instantTime));
-    if (potentialTimeline.countInstants() == 0) {
-      return false;
-    }
-    if (potentialTimeline.countInstants() > 1) {
-      throw new IllegalStateException("Multiple instants with same timestamp: 
" + potentialTimeline);
-    }
-    return ClusteringUtils.isClusteringInstant(this, 
potentialTimeline.firstInstant().get());
+    return getOrCreatePendingClusteringInstantSet().contains(instantTime);
   }
 
   @Override
@@ -578,6 +573,27 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return this.instantTimeSet;
   }
 
+  private Set<String> getOrCreatePendingClusteringInstantSet() {
+    if (this.pendingReplaceClusteringInstants == null) {
+      synchronized (this) {
+        if (this.pendingReplaceClusteringInstants == null) {
+          List<HoodieInstant> pendingReplaceInstants = 
getCommitsTimeline().filterPendingReplaceTimeline().getInstants();
+          // Validate that there are no instants with same timestamp
+          
pendingReplaceInstants.stream().collect(Collectors.groupingBy(HoodieInstant::getTimestamp)).forEach((timestamp,
 instants) -> {
+            if (instants.size() > 1) {
+              throw new IllegalStateException("Multiple instants with same 
timestamp: " + timestamp + " instants: " + instants);
+            }
+          });
+          // Filter replace commits down to those that are due to clustering
+          this.pendingReplaceClusteringInstants = 
pendingReplaceInstants.stream()
+              .filter(instant -> ClusteringUtils.isClusteringInstant(this, 
instant))
+              .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+        }
+      }
+    }
+    return this.pendingReplaceClusteringInstants;
+  }
+
   /**
    * Returns the first non savepoint commit on the timeline.
    */

Reply via email to