This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9c61c4c3cac [HUDI-7575] Avoid repeated fetching of pending replace
instants (#10976)
9c61c4c3cac is described below
commit 9c61c4c3caca1d6bd18f447c5e0d595bca93a3df
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 26 01:27:50 2024 -0700
[HUDI-7575] Avoid repeated fetching of pending replace instants (#10976)
---
.../table/timeline/HoodieDefaultTimeline.java | 32 ++++++++++++++++------
1 file changed, 24 insertions(+), 8 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 50f339d6bdb..71d63f16952 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -63,6 +63,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
private List<HoodieInstant> instants;
// for efficient #contains queries.
private transient volatile Set<String> instantTimeSet;
+ // for efficient #isPendingClusterInstant queries
+ private transient volatile Set<String> pendingReplaceClusteringInstants;
// for efficient #isBeforeTimelineStarts check.
private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
private String timelineHash;
@@ -540,14 +542,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public boolean isPendingClusterInstant(String instantTime) {
- HoodieTimeline potentialTimeline =
getCommitsTimeline().filterPendingReplaceTimeline().filter(i ->
i.getTimestamp().equals(instantTime));
- if (potentialTimeline.countInstants() == 0) {
- return false;
- }
- if (potentialTimeline.countInstants() > 1) {
- throw new IllegalStateException("Multiple instants with same timestamp:
" + potentialTimeline);
- }
- return ClusteringUtils.isClusteringInstant(this,
potentialTimeline.firstInstant().get());
+ return getOrCreatePendingClusteringInstantSet().contains(instantTime);
}
@Override
@@ -576,6 +571,27 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
return this.instantTimeSet;
}
+ private Set<String> getOrCreatePendingClusteringInstantSet() {
+ if (this.pendingReplaceClusteringInstants == null) {
+ synchronized (this) {
+ if (this.pendingReplaceClusteringInstants == null) {
+ List<HoodieInstant> pendingReplaceInstants =
getCommitsTimeline().filterPendingReplaceTimeline().getInstants();
+ // Validate that there are no instants with same timestamp
+
pendingReplaceInstants.stream().collect(Collectors.groupingBy(HoodieInstant::getTimestamp)).forEach((timestamp,
instants) -> {
+ if (instants.size() > 1) {
+ throw new IllegalStateException("Multiple instants with same
timestamp: " + timestamp + " instants: " + instants);
+ }
+ });
+ // Filter replace commits down to those that are due to clustering
+ this.pendingReplaceClusteringInstants =
pendingReplaceInstants.stream()
+ .filter(instant -> ClusteringUtils.isClusteringInstant(this,
instant))
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ }
+ }
+ }
+ return this.pendingReplaceClusteringInstants;
+ }
+
/**
* Returns the first non savepoint commit on the timeline.
*/