This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.0-siva-0.14.1
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.14.0-siva-0.14.1 by 
this push:
     new 68f37119ad1 [HUDI-7199] Optimize contains impl with 
HoodieDefaultTimeline (#10284)
68f37119ad1 is described below

commit 68f37119ad19bb42cc68f5b707d6de5a353831ab
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Dec 10 20:14:47 2023 -0800

    [HUDI-7199] Optimize contains impl with HoodieDefaultTimeline (#10284)
---
 .../table/timeline/HoodieDefaultTimeline.java      | 55 ++++++++++++++++------
 1 file changed, 41 insertions(+), 14 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 1f264955269..b170eb81865 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -55,6 +55,10 @@ public class HoodieDefaultTimeline implements HoodieTimeline 
{
 
   protected transient Function<HoodieInstant, Option<byte[]>> details;
   private List<HoodieInstant> instants;
+  // for efficient #contains queries.
+  private transient volatile Set<String> instantTimeSet;
+  // for efficient #isBeforeTimelineStarts check.
+  private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
   private String timelineHash;
 
   public HoodieDefaultTimeline(Stream<HoodieInstant> instants, 
Function<HoodieInstant, Option<byte[]>> details) {
@@ -426,7 +430,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   @Override
   public boolean containsInstant(String ts) {
     // Check for 0.10.0+ timestamps which have msec granularity
-    if (getInstantsAsStream().anyMatch(s -> s.getTimestamp().equals(ts))) {
+    if (getOrCreateInstantSet().contains(ts)) {
       return true;
     }
 
@@ -477,20 +481,14 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   public Option<HoodieInstant> getFirstNonSavepointCommit() {
-    Option<HoodieInstant> firstCommit = firstInstant();
-    Set<String> savepointTimestamps = getInstantsAsStream()
-        .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
-        .map(HoodieInstant::getTimestamp)
-        .collect(Collectors.toSet());
-    Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
-    if (!savepointTimestamps.isEmpty()) {
-      // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
-      // So, the first non-savepoint commit is considered as beginning of the 
active timeline.
-      firstNonSavepointCommit = Option.fromJavaOptional(getInstantsAsStream()
-          .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
-          .findFirst());
+    if (this.firstNonSavepointCommit == null) {
+      synchronized (this) {
+        if (this.firstNonSavepointCommit == null) {
+          this.firstNonSavepointCommit = 
findFirstNonSavepointCommit(this.instants);
+        }
+      }
     }
-    return firstNonSavepointCommit;
+    return this.firstNonSavepointCommit;
   }
 
   public Option<HoodieInstant> getLastClusterCommit() {
@@ -535,4 +533,33 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     };
     return new HoodieDefaultTimeline(instantStream, details);
   }
+
+  private Set<String> getOrCreateInstantSet() {
+    if (this.instantTimeSet == null) {
+      synchronized (this) {
+        if (this.instantTimeSet == null) {
+          this.instantTimeSet = 
this.instants.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+        }
+      }
+    }
+    return this.instantTimeSet;
+  }
+
+  /**
+   * Returns the first non savepoint commit on the timeline.
+   */
+  private static Option<HoodieInstant> 
findFirstNonSavepointCommit(List<HoodieInstant> instants) {
+    Set<String> savepointTimestamps = instants.stream()
+        .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toSet());
+    if (!savepointTimestamps.isEmpty()) {
+      // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
+      // So, the first non-savepoint commit is considered as beginning of the 
active timeline.
+      return Option.fromJavaOptional(instants.stream()
+          .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+          .findFirst());
+    }
+    return Option.fromJavaOptional(instants.stream().findFirst());
+  }
 }

Reply via email to