This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.14.0-siva-0.14.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.14.0-siva-0.14.1 by
this push:
new 68f37119ad1 [HUDI-7199] Optimize contains impl with
HoodieDefaultTimeline (#10284)
68f37119ad1 is described below
commit 68f37119ad19bb42cc68f5b707d6de5a353831ab
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Dec 10 20:14:47 2023 -0800
[HUDI-7199] Optimize contains impl with HoodieDefaultTimeline (#10284)
---
.../table/timeline/HoodieDefaultTimeline.java | 55 ++++++++++++++++------
1 file changed, 41 insertions(+), 14 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 1f264955269..b170eb81865 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -55,6 +55,10 @@ public class HoodieDefaultTimeline implements HoodieTimeline
{
protected transient Function<HoodieInstant, Option<byte[]>> details;
private List<HoodieInstant> instants;
+ // for efficient #contains queries.
+ private transient volatile Set<String> instantTimeSet;
+ // for efficient #isBeforeTimelineStarts check.
+ private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
private String timelineHash;
public HoodieDefaultTimeline(Stream<HoodieInstant> instants,
Function<HoodieInstant, Option<byte[]>> details) {
@@ -426,7 +430,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public boolean containsInstant(String ts) {
// Check for 0.10.0+ timestamps which have msec granularity
- if (getInstantsAsStream().anyMatch(s -> s.getTimestamp().equals(ts))) {
+ if (getOrCreateInstantSet().contains(ts)) {
return true;
}
@@ -477,20 +481,14 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
public Option<HoodieInstant> getFirstNonSavepointCommit() {
- Option<HoodieInstant> firstCommit = firstInstant();
- Set<String> savepointTimestamps = getInstantsAsStream()
- .filter(entry ->
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
- .map(HoodieInstant::getTimestamp)
- .collect(Collectors.toSet());
- Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
- if (!savepointTimestamps.isEmpty()) {
- // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
- // So, the first non-savepoint commit is considered as beginning of the
active timeline.
- firstNonSavepointCommit = Option.fromJavaOptional(getInstantsAsStream()
- .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
- .findFirst());
+ if (this.firstNonSavepointCommit == null) {
+ synchronized (this) {
+ if (this.firstNonSavepointCommit == null) {
+ this.firstNonSavepointCommit =
findFirstNonSavepointCommit(this.instants);
+ }
+ }
}
- return firstNonSavepointCommit;
+ return this.firstNonSavepointCommit;
}
public Option<HoodieInstant> getLastClusterCommit() {
@@ -535,4 +533,33 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
};
return new HoodieDefaultTimeline(instantStream, details);
}
+
+ private Set<String> getOrCreateInstantSet() {
+ if (this.instantTimeSet == null) {
+ synchronized (this) {
+ if (this.instantTimeSet == null) {
+ this.instantTimeSet =
this.instants.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ }
+ }
+ }
+ return this.instantTimeSet;
+ }
+
+ /**
+ * Returns the first non savepoint commit on the timeline.
+ */
+ private static Option<HoodieInstant>
findFirstNonSavepointCommit(List<HoodieInstant> instants) {
+ Set<String> savepointTimestamps = instants.stream()
+ .filter(entry ->
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toSet());
+ if (!savepointTimestamps.isEmpty()) {
+ // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
+ // So, the first non-savepoint commit is considered as beginning of the
active timeline.
+ return Option.fromJavaOptional(instants.stream()
+ .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+ .findFirst());
+ }
+ return Option.fromJavaOptional(instants.stream().findFirst());
+ }
}