This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c61c4c3cac [HUDI-7575] Avoid repeated fetching of pending replace 
instants (#10976)
9c61c4c3cac is described below

commit 9c61c4c3caca1d6bd18f447c5e0d595bca93a3df
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 26 01:27:50 2024 -0700

    [HUDI-7575] Avoid repeated fetching of pending replace instants (#10976)
---
 .../table/timeline/HoodieDefaultTimeline.java      | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 50f339d6bdb..71d63f16952 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -63,6 +63,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
   private List<HoodieInstant> instants;
   // for efficient #contains queries.
   private transient volatile Set<String> instantTimeSet;
+  // for efficient #isPendingClusterInstant queries
+  private transient volatile Set<String> pendingReplaceClusteringInstants;
   // for efficient #isBeforeTimelineStarts check.
   private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
   private String timelineHash;
@@ -540,14 +542,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
 
   @Override
   public boolean isPendingClusterInstant(String instantTime) {
-    HoodieTimeline potentialTimeline = 
getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> 
i.getTimestamp().equals(instantTime));
-    if (potentialTimeline.countInstants() == 0) {
-      return false;
-    }
-    if (potentialTimeline.countInstants() > 1) {
-      throw new IllegalStateException("Multiple instants with same timestamp: 
" + potentialTimeline);
-    }
-    return ClusteringUtils.isClusteringInstant(this, 
potentialTimeline.firstInstant().get());
+    return getOrCreatePendingClusteringInstantSet().contains(instantTime);
   }
 
   @Override
@@ -576,6 +571,27 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return this.instantTimeSet;
   }
 
+  private Set<String> getOrCreatePendingClusteringInstantSet() {
+    if (this.pendingReplaceClusteringInstants == null) {
+      synchronized (this) {
+        if (this.pendingReplaceClusteringInstants == null) {
+          List<HoodieInstant> pendingReplaceInstants = 
getCommitsTimeline().filterPendingReplaceTimeline().getInstants();
+          // Validate that there are no instants with same timestamp
+          
pendingReplaceInstants.stream().collect(Collectors.groupingBy(HoodieInstant::getTimestamp)).forEach((timestamp,
 instants) -> {
+            if (instants.size() > 1) {
+              throw new IllegalStateException("Multiple instants with same 
timestamp: " + timestamp + " instants: " + instants);
+            }
+          });
+          // Filter replace commits down to those that are due to clustering
+          this.pendingReplaceClusteringInstants = 
pendingReplaceInstants.stream()
+              .filter(instant -> ClusteringUtils.isClusteringInstant(this, 
instant))
+              .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+        }
+      }
+    }
+    return this.pendingReplaceClusteringInstants;
+  }
+
   /**
    * Returns the first non savepoint commit on the timeline.
    */

Reply via email to