This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7863348c526 [HUDI-6678] Fix the acquisition of clean&rollback instants
to archive (#9416)
7863348c526 is described below
commit 7863348c52656788c68613dcd403659436347b75
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Sep 17 07:44:08 2023 +0800
[HUDI-6678] Fix the acquisition of clean&rollback instants to archive
(#9416)
The current `getCleanInstantsToArchive` filters clean and rollback instants
according to maxInstantsToKeep and minInstantsToKeep, respectively. There are
two disadvantages: (1) If user has only a few rollback instants (not satisfied
with the archive), then they will exist forever, even if the commit instant has
become very large, which can be very confusing for users; (2) Archiving clean
and rollback separately will cause holes in the active timeline. This commit
improves the logic [...]
---
.../client/timeline/HoodieTimelineArchiver.java | 244 +++++++++++----------
.../hudi/table/action/clean/CleanPlanner.java | 2 +-
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 176 ++++++---------
.../apache/hudi/common/util/ClusteringUtils.java | 8 +-
.../apache/hudi/common/util/CompactionUtils.java | 6 +-
.../hudi/common/util/TestClusteringUtils.java | 6 +-
.../hudi/common/util/TestCompactionUtils.java | 8 +-
7 files changed, 210 insertions(+), 240 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index 52e6e605594..dc761e23804 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -57,9 +56,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
@@ -128,140 +126,104 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}
}
- private Stream<HoodieInstant> getCleanInstantsToArchive() {
+ private List<HoodieInstant>
getCleanAndRollbackInstantsToArchive(HoodieInstant
latestCommitInstantToArchive) {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
-
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION,
HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
+
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION,
HoodieTimeline.ROLLBACK_ACTION))
+ .filterCompletedInstants();
+
+ // Since the commit instants to archive is continuous, we can use the
latest commit instant to archive as the
+ // right boundary to collect the clean or rollback instants to archive.
+ //
+ //
latestCommitInstantToArchive
+ // v
+ // | commit1 clean1 commit2 commit3 clean2 commit4 rollback1 commit5 |
commit6 clean3 commit7 ...
+ // | <------------------ instants to archive --------------------> |
+ //
+ // CommitInstantsToArchive: commit1, commit2, commit3, commit4, commit5
+ // CleanAndRollbackInstantsToArchive: clean1, clean2, rollback1
+
return cleanAndRollbackTimeline.getInstantsAsStream()
-
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
- .map(hoodieInstants -> {
- if (hoodieInstants.size() > this.maxInstantsToKeep) {
- return hoodieInstants.subList(0, hoodieInstants.size() -
this.minInstantsToKeep);
- } else {
- return Collections.<HoodieInstant>emptyList();
- }
- }).flatMap(Collection::stream);
+ .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN,
latestCommitInstantToArchive.getTimestamp()))
+ .collect(Collectors.toList());
}
- private Stream<HoodieInstant> getCommitInstantsToArchive() throws
IOException {
- // TODO (na) : Add a way to return actions associated with a timeline and
then merge/unify
- // with logic above to avoid Stream.concat
- HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+ private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+ HoodieTimeline completedCommitsTimeline =
table.getCompletedCommitsTimeline();
+
+ if (completedCommitsTimeline.countInstants() <= maxInstantsToKeep) {
+ return Collections.emptyList();
+ }
+
+ // Step1: Get all candidates of earliestInstantToRetain.
+ List<Option<HoodieInstant>> earliestInstantToRetainCandidates = new
ArrayList<>();
- // Get the oldest inflight instant and a completed commit before this
inflight instant.
- Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
+ // 1. Earliest commit to retain is the greatest completed commit, that is
less than the earliest pending instant.
+ // In some cases when inflight is the lowest commit then earliest commit
to retain will be equal to the earliest
+ // inflight commit.
+ Option<HoodieInstant> earliestPendingInstant = table.getActiveTimeline()
.getWriteTimeline()
.filter(instant -> !instant.isCompleted())
.firstInstant();
- // Oldest commit to retain is the greatest completed commit, that is less
than the oldest pending instant.
- // In some cases when inflight is the lowest commit then oldest commit to
retain will be equal to oldest
- // inflight commit.
- Option<HoodieInstant> oldestCommitToRetain;
- if (oldestPendingInstant.isPresent()) {
- Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
- Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
- .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
- LESSER_THAN,
oldestPendingInstant.get().getTimestamp())).findFirst());
- // Check if the completed instant is higher than the oldest inflight
instant
- // in that case update the oldestCommitToRetain to oldestInflight commit
time.
- if (!completedCommitBeforeOldestPendingInstant.isPresent()) {
- oldestCommitToRetain = oldestPendingInstant;
+ Option<HoodieInstant> earliestCommitToRetain;
+ if (earliestPendingInstant.isPresent()) {
+ Option<HoodieInstant> completedCommitBeforeEarliestPendingInstant =
Option.fromJavaOptional(completedCommitsTimeline
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN,
earliestPendingInstant.get().getTimestamp()))
+ .getReverseOrderedInstants().findFirst());
+ // Check if the completed instant is higher than the earliest inflight
instant
+ // in that case update the earliestCommitToRetain to earliestInflight
commit time.
+ if (!completedCommitBeforeEarliestPendingInstant.isPresent()) {
+ earliestCommitToRetain = earliestPendingInstant;
} else {
- oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
+ earliestCommitToRetain = completedCommitBeforeEarliestPendingInstant;
}
} else {
- oldestCommitToRetain = Option.empty();
+ earliestCommitToRetain = Option.empty();
}
-
- // NOTE: We cannot have any holes in the commit timeline.
- // We cannot archive any commits which are made after the first savepoint
present,
- // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
- Option<HoodieInstant> firstSavepoint =
table.getCompletedSavepointTimeline().firstInstant();
- Set<String> savepointTimestamps = table.getSavepointTimestamps();
- if (!commitTimeline.empty() && commitTimeline.countInstants() >
maxInstantsToKeep) {
- // For Merge-On-Read table, inline or async compaction is enabled
- // We need to make sure that there are enough delta commits in the
active timeline
- // to trigger compaction scheduling, when the trigger strategy of
compaction is
- // NUM_COMMITS or NUM_AND_TIME.
- Option<HoodieInstant> oldestInstantToRetainForCompaction =
- (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ
- && (config.getInlineCompactTriggerStrategy() ==
CompactionTriggerStrategy.NUM_COMMITS
- || config.getInlineCompactTriggerStrategy() ==
CompactionTriggerStrategy.NUM_AND_TIME))
- ? CompactionUtils.getOldestInstantToRetainForCompaction(
- table.getActiveTimeline(),
config.getInlineCompactDeltaCommitMax())
- : Option.empty();
-
- // The clustering commit instant can not be archived unless we ensure
that the replaced files have been cleaned,
- // without the replaced files metadata on the timeline, the fs view
would expose duplicates for readers.
- // Meanwhile, when inline or async clustering is enabled, we need to
ensure that there is a commit in the active timeline
- // to check whether the file slice generated in pending clustering after
archive isn't committed.
- Option<HoodieInstant> oldestInstantToRetainForClustering =
-
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient());
-
- // Actually do the commits
- Stream<HoodieInstant> instantToArchiveStream =
commitTimeline.getInstantsAsStream()
- .filter(s -> {
- if (config.shouldArchiveBeyondSavepoint()) {
- // skip savepoint commits and proceed further
- return !savepointTimestamps.contains(s.getTimestamp());
- } else {
- // if no savepoint present, then don't filter
- // stop at first savepoint commit
- return !(firstSavepoint.isPresent() &&
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS,
s.getTimestamp()));
- }
- }).filter(s -> {
- // oldestCommitToRetain is the highest completed commit instant
that is less than the oldest inflight instant.
- // By filtering out any commit >= oldestCommitToRetain, we can
ensure there are no gaps in the timeline
- // when inflight commits are present.
- return oldestCommitToRetain
- .map(instant -> compareTimestamps(instant.getTimestamp(),
GREATER_THAN, s.getTimestamp()))
- .orElse(true);
- }).filter(s ->
- oldestInstantToRetainForCompaction.map(instantToRetain ->
- compareTimestamps(s.getTimestamp(), LESSER_THAN,
instantToRetain.getTimestamp()))
- .orElse(true)
- ).filter(s ->
- oldestInstantToRetainForClustering.map(instantToRetain ->
- HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, instantToRetain.getTimestamp()))
- .orElse(true)
- );
- return instantToArchiveStream.limit(commitTimeline.countInstants() -
minInstantsToKeep);
- } else {
- return Stream.empty();
- }
- }
-
- private Stream<ActiveAction> getInstantsToArchive() throws IOException {
- if (config.isMetaserverEnabled()) {
- return Stream.empty();
- }
-
- // For archiving and cleaning instants, we need to include intermediate
state files if they exist
- HoodieActiveTimeline rawActiveTimeline = new
HoodieActiveTimeline(metaClient, false);
- Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction =
rawActiveTimeline.getInstantsAsStream()
- .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
- HoodieInstant.getComparableAction(i.getAction()))));
-
- Stream<HoodieInstant> instants =
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
-
- // If metadata table is enabled, do not archive instants which are more
recent than the last compaction on the
+ earliestInstantToRetainCandidates.add(earliestCommitToRetain);
+
+ // 2. For Merge-On-Read table, inline or async compaction is enabled
+ // We need to make sure that there are enough delta commits in the active
timeline
+ // to trigger compaction scheduling, when the trigger strategy of
compaction is
+ // NUM_COMMITS or NUM_AND_TIME.
+ Option<HoodieInstant> earliestInstantToRetainForCompaction =
+ (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ
+ && (config.getInlineCompactTriggerStrategy() ==
CompactionTriggerStrategy.NUM_COMMITS
+ || config.getInlineCompactTriggerStrategy() ==
CompactionTriggerStrategy.NUM_AND_TIME))
+ ? CompactionUtils.getEarliestInstantToRetainForCompaction(
+ table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
+ : Option.empty();
+
earliestInstantToRetainCandidates.add(earliestInstantToRetainForCompaction);
+
+ // 3. The clustering commit instant can not be archived unless we ensure
that the replaced files have been cleaned,
+ // without the replaced files metadata on the timeline, the fs view would
expose duplicates for readers.
+ // Meanwhile, when inline or async clustering is enabled, we need to
ensure that there is a commit in the active timeline
+ // to check whether the file slice generated in pending clustering after
archive isn't committed.
+ Option<HoodieInstant> earliestInstantToRetainForClustering =
+
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient());
+
earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);
+
+ // 4. If metadata table is enabled, do not archive instants which are more
recent than the last compaction on the
// metadata table.
if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
try (HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
config.getBasePath())) {
Option<String> latestCompactionTime =
tableMetadata.getLatestCompactionTime();
if (!latestCompactionTime.isPresent()) {
LOG.info("Not archiving as there is no compaction yet on the
metadata table");
- instants = Stream.empty();
+ return Collections.emptyList();
} else {
LOG.info("Limiting archiving of instants to latest compaction on
metadata table at " + latestCompactionTime.get());
- instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), LESSER_THAN,
- latestCompactionTime.get()));
+ earliestInstantToRetainCandidates.add(Option.of(new HoodieInstant(
+ HoodieInstant.State.COMPLETED, COMPACTION_ACTION,
latestCompactionTime.get())));
}
} catch (Exception e) {
throw new HoodieException("Error limiting instant archival based on
metadata table", e);
}
}
+ // 5. If this is a metadata table, do not archive the commits that live in
data set
+ // active timeline. This is required by metadata table,
+ // see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (table.isMetadataTable()) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
@@ -282,16 +244,62 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
// CLEAN or ROLLBACK instant can block the archive of metadata table
timeline and causes
// the active timeline of metadata table to be extremely long, leading
to performance issues
// for loading the timeline.
- if (qualifiedEarliestInstant.isPresent()) {
- instants = instants.filter(instant ->
- compareTimestamps(
- instant.getTimestamp(),
- HoodieTimeline.LESSER_THAN,
- qualifiedEarliestInstant.get().getTimestamp()));
- }
+ earliestInstantToRetainCandidates.add(qualifiedEarliestInstant);
}
- return instants.map(hoodieInstant -> {
+ // Choose the instant in earliestInstantToRetainCandidates with the
smallest
+ // timestamp as earliestInstantToRetain.
+ java.util.Optional<HoodieInstant> earliestInstantToRetain =
earliestInstantToRetainCandidates
+ .stream()
+ .filter(Option::isPresent)
+ .map(Option::get)
+ .min(HoodieInstant.COMPARATOR);
+
+ // Step2: We cannot archive any commits which are made after the first
savepoint present,
+ // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+ Option<HoodieInstant> firstSavepoint =
table.getCompletedSavepointTimeline().firstInstant();
+ Set<String> savepointTimestamps = table.getSavepointTimestamps();
+
+ Stream<HoodieInstant> instantToArchiveStream =
completedCommitsTimeline.getInstantsAsStream()
+ .filter(s -> {
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // skip savepoint commits and proceed further
+ return !savepointTimestamps.contains(s.getTimestamp());
+ } else {
+ // if no savepoint present, then don't filter
+ // stop at first savepoint commit
+ return !firstSavepoint.isPresent() ||
compareTimestamps(s.getTimestamp(), LESSER_THAN,
firstSavepoint.get().getTimestamp());
+ }
+ }).filter(s -> earliestInstantToRetain
+ .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN,
instant.getTimestamp()))
+ .orElse(true));
+ return
instantToArchiveStream.limit(completedCommitsTimeline.countInstants() -
minInstantsToKeep)
+ .collect(Collectors.toList());
+ }
+
+ private Stream<ActiveAction> getInstantsToArchive() throws IOException {
+ if (config.isMetaserverEnabled()) {
+ return Stream.empty();
+ }
+
+ // First get commit instants to archive.
+ List<HoodieInstant> instantsToArchive = getCommitInstantsToArchive();
+ if (!instantsToArchive.isEmpty()) {
+ HoodieInstant latestCommitInstantToArchive =
instantsToArchive.get(instantsToArchive.size() - 1);
+ // Then get clean and rollback instants to archive.
+ List<HoodieInstant> cleanAndRollbackInstantsToArchive =
+ getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive);
+ instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);
+ instantsToArchive.sort(HoodieInstant.COMPARATOR);
+ }
+
+ // For archive, we need to include instant's all states.
+ HoodieActiveTimeline rawActiveTimeline = new
HoodieActiveTimeline(metaClient, false);
+ Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction =
rawActiveTimeline.getInstantsAsStream()
+ .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
+ HoodieInstant.getComparableAction(i.getAction()))));
+
+ return instantsToArchive.stream().map(hoodieInstant -> {
List<HoodieInstant> instantsToStream =
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
return ActiveAction.fromInstants(instantsToStream);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 86070844701..112034fd877 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -506,7 +506,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
}
/**
- * Returns earliest commit to retain based on cleaning policy.
+ * Returns the earliest commit to retain based on cleaning policy.
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
return CleanerUtils.getEarliestCommitToRetain(
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 0000c171839..724239a7738 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -832,12 +832,12 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
// only time when archival will kick in
List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000003")));
-
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000004"), HoodieTimeline.ROLLBACK_ACTION));
+
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000002"),
HoodieTimeline.ROLLBACK_ACTION));
List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
expectedActiveInstants.addAll(getActiveCommitInstants(
Arrays.asList("00000005", "00000007", "00000009", "00000011")));
expectedActiveInstants.addAll(getActiveCommitInstants(
- Arrays.asList("00000006", "00000008", "00000010", "00000012"),
HoodieTimeline.ROLLBACK_ACTION));
+ Arrays.asList("00000004", "00000006", "00000008", "00000010",
"00000012"), HoodieTimeline.ROLLBACK_ACTION));
verifyArchival(expectedArchivedInstants, expectedActiveInstants,
commitsAfterArchival);
}
}
@@ -977,25 +977,21 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws
Exception {
- HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 2);
-
- // min archival commits is 4 and max archival commits is 5
- // (either clean commits has to be > 5 or commits has to be greater than 5)
- // and so, after 6th instant, 2 instants will be archived.
- // 1,2,3,4,5,6 : after archival -> 1,3,4,5,6
- // (because, 2,3,4,5 and 6 are clean instants and are eligible for
archival)
- // after 7th and 9th instant no-op wrt archival. After 8th instant,
- // archival kicks in when metadata table is enabled.
+ HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 8);
+ // Min archival commits is 2 and max archival commits is 4.
+ // When metadata table is not enabled, after 5th write instant, archive
will be triggered.
+ // When metadata table is enabled, after 8th instant (6 write instants + 2
clean instants) >= maxDeltaCommitsMetadataTable,
+ // archival kicks in when compaction in metadata table triggered.
Map<String, Integer> cleanStats = new HashMap<>();
cleanStats.put("p1", 1);
cleanStats.put("p2", 2);
- for (int i = 1; i <= 10; i++) {
+ for (int i = 1; i <= 8; i++) {
if (i == 1) {
- testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1",
"p2"), 20);
- } else if (i <= 7 || i == 9) {
- testTable.doClean("0000000" + i, cleanStats);
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1",
"p2"), 20);
+ } else if (i <= 3) {
+ testTable.doClean(String.format("%08d", i), cleanStats);
} else {
- testTable.doWriteOperation("000000" + String.format("%02d", i),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") :
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"),
2);
}
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
@@ -1005,37 +1001,34 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
assertEquals(originalCommits, commitsAfterArchival);
} else if (i == 7) {
if (!enableMetadata) {
- // 1,2,3,4,5,6,7 : after archival -> 1,4,5,6,7 (bcoz, 2,3,4,5,6,7
are clean instants and are eligible for archival)
- List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001")));
- expectedActiveInstants.addAll(
- getActiveCommitInstants(Arrays.asList("00000004", "00000005",
"00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
- verifyArchival(getAllArchivedCommitInstants(
- Arrays.asList("00000002", "00000003"),
HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival);
+ // do archive:
+ // clean: 2,3: after archival -> null
+ // write: 1,4,5,6,7: after archival -> 6, 7
+ List<HoodieInstant> expectedActiveInstants = new
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000006", "00000007")));
+
+ List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000004", "00000005")));
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003"), HoodieTimeline.CLEAN_ACTION));
+
+ verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
} else {
- // with metadata enabled, archival in data table is fenced based on
compaction in metadata table. Clean commits in data table will not trigger
compaction in
- // metadata table.
- List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001")));
- expectedActiveInstants.addAll(getActiveCommitInstants(
- Arrays.asList("00000002", "00000003", "00000004", "00000005",
"00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
- verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(),
HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival);
+ // with metadata enabled, archival in data table is fenced based on
compaction in metadata table.
+ assertEquals(originalCommits, commitsAfterArchival);
}
} else {
if (!enableMetadata) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
- if (i == 8) {
- // when i == 7 compaction in metadata table will be triggered
- // and after wards archival in datatable will kick in when i == 8.
- // 1,2,3,4,5,6,7,8 : after archival -> 1,4,5,6,7,8 (bcoz, 2,3,4,5
and 6 are clean commits and are eligible for archival)
- List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001",
"00000008")));
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000004",
"00000005", "00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
-
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants,
commitsAfterArchival);
- } else {
- assertEquals(originalCommits, commitsAfterArchival);
- }
+ // when i == 8 compaction in metadata table will be triggered, and
then allow archive:
+ // clean: 2,3: after archival -> null
+ // write: 1,4,5,6,7,8: after archival -> 7, 8
+ List<HoodieInstant> expectedActiveInstants = new
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
+
+ List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000004", "00000005", "00000006")));
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003"), HoodieTimeline.CLEAN_ACTION));
+
+ verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
}
}
}
@@ -1043,40 +1036,43 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
@Test
public void testArchiveRollbacksAndCleanTestTable() throws Exception {
- int minArchiveCommits = 4;
- int maxArchiveCommits = 9;
+ int minArchiveCommits = 2;
+ int maxArchiveCommits = 4;
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true,
minArchiveCommits, maxArchiveCommits, 2);
- // trigger 1 commit to add lot of files so that future cleans can clean
them up
- testTable.doWriteOperation("00000001", WriteOperationType.UPSERT,
Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20);
+ // trigger 1 commit to add a lot of files so that future cleans can clean
them up
+ testTable.doWriteOperation(String.format("%08d", 1),
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1",
"p2"), 20);
Map<String, Integer> partitionToFileDeleteCount = new HashMap<>();
partitionToFileDeleteCount.put("p1", 1);
partitionToFileDeleteCount.put("p2", 1);
- // we are triggering 10 clean commits. (1 is commit, 2 -> 11 is clean)
- for (int i = 2; i <= (maxArchiveCommits + 2); i++) {
- testTable.doClean((i > 9 ? ("000000") : ("0000000")) + i,
partitionToFileDeleteCount);
+
+ for (int i = 2; i < 5; i++) {
+ testTable.doClean(String.format("%08d", i), partitionToFileDeleteCount);
}
- // we are triggering 7 commits and 7 rollbacks for the same
- for (int i = 12; i <= (2 * maxArchiveCommits); i += 2) {
- testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT,
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
- testTable.doRollback("000000" + i, "000000" + (i + 1));
+ for (int i = 5; i <= 11; i += 2) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"),
2);
+ testTable.doRollback(String.format("%08d", i), String.format("%08d", i +
1));
}
- // trigger archival
+ // trigger archival:
+ // clean: 2,3: after archival -> null
+ // write: 1,5,7,9,11: after archival -> 9,11
+ // rollback: 6,8,10,12: after archival -> 8,10,12
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
- List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
- // out of 10 clean commits, 6 will be archived. 2 to 7. 8 to 11 will be
active.
- // wrt regular commits, there aren't 9 commits yet and so all of them will
be active.
List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000009", "00000010", "00000011"), HoodieTimeline.CLEAN_ACTION));
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001",
"00000012", "00000014", "00000016", "00000018")));
-
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000013",
"00000015", "00000017", "00000019"), HoodieTimeline.ROLLBACK_ACTION));
- verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003", "00000004", "00000005", "00000006", "00000007"),
- HoodieTimeline.CLEAN_ACTION), expectedActiveInstants,
commitsAfterArchival);
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000010", "00000012"), HoodieTimeline.ROLLBACK_ACTION));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009",
"00000011")));
+
+ List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000005", "00000007")));
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003", "00000004"), HoodieTimeline.CLEAN_ACTION));
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000006"),
HoodieTimeline.ROLLBACK_ACTION));
+
+ verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
}
@ParameterizedTest
@@ -1098,13 +1094,13 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
int startInstant = 1;
List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
- createCleanMetadata(startInstant + "", false, false, isEmpty || i % 2 ==
0);
- expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.CLEAN_ACTION, startInstant + ""));
+ createCleanMetadata(String.format("%02d", startInstant), false, false,
isEmpty || i % 2 == 0);
+ expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.CLEAN_ACTION, String.format("%02d", startInstant)));
}
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "",
false, isEmpty || i % 2 == 0);
- expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.ROLLBACK_ACTION, startInstant + ""));
+ expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.ROLLBACK_ACTION, String.format("%02d", startInstant)));
}
if (enableMetadataTable) {
@@ -1121,8 +1117,11 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
Stream<HoodieInstant> currentInstants =
metaClient.getActiveTimeline().reload().getInstantsAsStream();
Map<Object, List<HoodieInstant>> actionInstantMap =
currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
- assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must
be preset");
- assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(),
"Should have min instant");
+ // The commit order is: clean, clean, clean, ..., commit, rollback,
commit, rollback ...
+ // So after archive, actionInstantMap will contain commit and rollback,
+ // the number will be equal to minInstantsToKeep
+ assertTrue(actionInstantMap.containsKey("commit"), "Commit Action key must
be preset");
+ assertEquals(minInstantsToKeep, actionInstantMap.get("commit").size(),
"Should have min instant");
assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key
must be preset");
assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(),
"Should have min instant");
@@ -1136,43 +1135,6 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
expectedArchivedInstants.forEach(entry ->
assertTrue(metaClient.getArchivedTimeline().containsInstant(entry)));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testArchiveInflightClean(boolean enableMetadataTable) throws
Exception {
- init();
- HoodieWriteConfig cfg =
-
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
- .withParallelism(2, 2).forTable("test-trip-table")
-
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
-
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
- .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
- .withRemoteServerPort(timelineServicePort).build())
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
- .build();
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- createCleanMetadata("10", false);
- createCleanMetadata("11", false);
- HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false);
- HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false);
- HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true);
-
- if (enableMetadataTable) {
- // Simulate a compaction commit in metadata table timeline
- // so the archival in data table can happen
- createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath,
"14");
- }
-
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
-
- archiver.archiveIfRequired(context);
-
- List<HoodieInstant> notArchivedInstants =
metaClient.getActiveTimeline().reload().getInstants();
- assertEquals(3, notArchivedInstants.size(), "Not archived instants should
be 3");
- assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1,
notArchivedInstant2, notArchivedInstant3), "");
- }
-
@Test
public void testArchiveTableWithMetadataTableCompaction() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5,
7);
@@ -1637,14 +1599,14 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
}
private void verifyArchival(List<HoodieInstant> expectedArchivedInstants,
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant>
commitsAfterArchival) {
- Collections.sort(expectedActiveInstants,
Comparator.comparing(HoodieInstant::getTimestamp));
- Collections.sort(commitsAfterArchival,
Comparator.comparing(HoodieInstant::getTimestamp));
+
expectedActiveInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
+
commitsAfterArchival.sort(Comparator.comparing(HoodieInstant::getTimestamp));
assertEquals(expectedActiveInstants, commitsAfterArchival);
expectedArchivedInstants.forEach(entry ->
assertFalse(commitsAfterArchival.contains(entry)));
HoodieArchivedTimeline archivedTimeline = new
HoodieArchivedTimeline(metaClient);
List<HoodieInstant> actualArchivedInstants =
archivedTimeline.getInstants();
- Collections.sort(actualArchivedInstants,
Comparator.comparing(HoodieInstant::getTimestamp));
- Collections.sort(expectedArchivedInstants,
Comparator.comparing(HoodieInstant::getTimestamp));
+
actualArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
+
expectedArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
assertEquals(actualArchivedInstants, expectedArchivedInstants);
HoodieTimeline timeline = metaClient.getActiveTimeline();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index e50431c7398..894db11a2d1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -236,14 +236,14 @@ public class ClusteringUtils {
}
/**
- * Returns the oldest instant to retain.
- * Make sure the clustering instant won't be archived before cleaned, and
the oldest inflight clustering instant has a previous commit.
+ * Returns the earliest instant to retain.
+ * Make sure the clustering instant won't be archived before cleaned, and
the earliest inflight clustering instant has a previous commit.
*
* @param activeTimeline The active timeline
* @param metaClient The meta client
- * @return the oldest instant to retain for clustering
+ * @return the earliest instant to retain for clustering
*/
- public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+ public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(
HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient)
throws IOException {
Option<HoodieInstant> oldestInstantToRetain = Option.empty();
HoodieTimeline replaceTimeline =
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 0f41f1314e1..6b74dd869a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -339,7 +339,7 @@ public class CompactionUtils {
}
/**
- * Gets the oldest instant to retain for MOR compaction.
+ * Gets the earliest instant to retain for MOR compaction.
* If there is no completed compaction,
* num delta commits >= "hoodie.compact.inline.max.delta.commits"
* If there is a completed compaction,
@@ -348,9 +348,9 @@ public class CompactionUtils {
* @param activeTimeline Active timeline of a table.
* @param maxDeltaCommits Maximum number of delta commits that trigger the
compaction plan,
* i.e., "hoodie.compact.inline.max.delta.commits".
- * @return the oldest instant to keep for MOR compaction.
+ * @return the earliest instant to keep for MOR compaction.
*/
- public static Option<HoodieInstant> getOldestInstantToRetainForCompaction(
+ public static Option<HoodieInstant> getEarliestInstantToRetainForCompaction(
HoodieActiveTimeline activeTimeline, int maxDeltaCommits) {
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfoOption =
CompactionUtils.getDeltaCommitsSinceLatestCompaction(activeTimeline);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 4e76d25f41f..9028fe63fd4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -146,7 +146,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
HoodieInstant inflightInstant3 =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3,
Option.empty());
HoodieInstant completedInstant3 =
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3,
Option.empty());
metaClient.reloadActiveTimeline();
- Option<HoodieInstant> actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
assertTrue(actual.isPresent());
assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in
timeline, retain first replace commit");
@@ -168,7 +168,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();
- actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
assertEquals(clusterTime3, actual.get().getTimestamp(),
"retain the first replace commit after the earliestInstantToRetain ");
}
@@ -206,7 +206,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3,
Option.empty());
metaClient.reloadActiveTimeline();
- Option<HoodieInstant> actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
assertEquals(clusterTime2, actual.get().getTimestamp(),
"retain the first replace commit after the last complete clean ");
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index b7855bec767..7347e2ab696 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -285,7 +285,7 @@ public class TestCompactionUtils extends
HoodieCommonTestHarness {
@ValueSource(booleans = {true, false})
public void testGetOldestInstantToKeepForCompaction(boolean
hasCompletedCompaction) {
HoodieActiveTimeline timeline = prepareTimeline(hasCompletedCompaction);
- Option<HoodieInstant> actual =
CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20);
+ Option<HoodieInstant> actual =
CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20);
if (hasCompletedCompaction) {
assertEquals(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
"06"), actual.get());
@@ -293,17 +293,17 @@ public class TestCompactionUtils extends
HoodieCommonTestHarness {
assertEquals(new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"), actual.get());
}
- actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline,
3);
+ actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline,
3);
assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
"07"), actual.get());
- actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline,
2);
+ actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline,
2);
assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
"08"), actual.get());
}
@Test
public void testGetOldestInstantToKeepForCompactionWithEmptyDeltaCommits() {
HoodieActiveTimeline timeline = new MockHoodieActiveTimeline();
- assertEquals(Option.empty(),
CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20));
+ assertEquals(Option.empty(),
CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20));
}
private HoodieActiveTimeline prepareTimeline(boolean hasCompletedCompaction)
{