This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7863348c526 [HUDI-6678] Fix the acquisition of clean&rollback instants 
to archive (#9416)
7863348c526 is described below

commit 7863348c52656788c68613dcd403659436347b75
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Sep 17 07:44:08 2023 +0800

    [HUDI-6678] Fix the acquisition of clean&rollback instants to archive 
(#9416)
    
    The current `getCleanInstantsToArchive` filters clean and rollback instants 
according to maxInstantsToKeep and minInstantsToKeep, respectively.  There are 
two disadvantages: (1) If user has only a few rollback instants (not satisfied 
with the archive), then they will exist forever, even if the commit instant has 
become very large, which can be very confusing for users; (2) Archiving clean 
and rollback separately will cause holes in the active timeline.  This commit 
improves the logic  [...]
---
 .../client/timeline/HoodieTimelineArchiver.java    | 244 +++++++++++----------
 .../hudi/table/action/clean/CleanPlanner.java      |   2 +-
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 176 ++++++---------
 .../apache/hudi/common/util/ClusteringUtils.java   |   8 +-
 .../apache/hudi/common/util/CompactionUtils.java   |   6 +-
 .../hudi/common/util/TestClusteringUtils.java      |   6 +-
 .../hudi/common/util/TestCompactionUtils.java      |   8 +-
 7 files changed, 210 insertions(+), 240 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index 52e6e605594..dc761e23804 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -57,9 +56,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
@@ -128,140 +126,104 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     }
   }
 
-  private Stream<HoodieInstant> getCleanInstantsToArchive() {
+  private List<HoodieInstant> 
getCleanAndRollbackInstantsToArchive(HoodieInstant 
latestCommitInstantToArchive) {
     HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
-        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
+        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION))
+        .filterCompletedInstants();
+
+    // Since the commit instants to archive is continuous, we can use the 
latest commit instant to archive as the
+    // right boundary to collect the clean or rollback instants to archive.
+    //
+    //                                                  
latestCommitInstantToArchive
+    //                                                               v
+    //  | commit1 clean1 commit2 commit3 clean2 commit4 rollback1 commit5 | 
commit6 clean3 commit7 ...
+    //  | <------------------  instants to archive  --------------------> |
+    //
+    //  CommitInstantsToArchive: commit1, commit2, commit3, commit4, commit5
+    //  CleanAndRollbackInstantsToArchive: clean1, clean2, rollback1
+
     return cleanAndRollbackTimeline.getInstantsAsStream()
-        
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
-        .map(hoodieInstants -> {
-          if (hoodieInstants.size() > this.maxInstantsToKeep) {
-            return hoodieInstants.subList(0, hoodieInstants.size() - 
this.minInstantsToKeep);
-          } else {
-            return Collections.<HoodieInstant>emptyList();
-          }
-        }).flatMap(Collection::stream);
+        .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
latestCommitInstantToArchive.getTimestamp()))
+        .collect(Collectors.toList());
   }
 
-  private Stream<HoodieInstant> getCommitInstantsToArchive() throws 
IOException {
-    // TODO (na) : Add a way to return actions associated with a timeline and 
then merge/unify
-    // with logic above to avoid Stream.concat
-    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+  private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+    HoodieTimeline completedCommitsTimeline = 
table.getCompletedCommitsTimeline();
+
+    if (completedCommitsTimeline.countInstants() <= maxInstantsToKeep) {
+      return Collections.emptyList();
+    }
+
+    // Step1: Get all candidates of earliestInstantToRetain.
+    List<Option<HoodieInstant>> earliestInstantToRetainCandidates = new 
ArrayList<>();
 
-    // Get the oldest inflight instant and a completed commit before this 
inflight instant.
-    Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
+    // 1. Earliest commit to retain is the greatest completed commit, that is 
less than the earliest pending instant.
+    // In some cases when inflight is the lowest commit then earliest commit 
to retain will be equal to the earliest
+    // inflight commit.
+    Option<HoodieInstant> earliestPendingInstant = table.getActiveTimeline()
         .getWriteTimeline()
         .filter(instant -> !instant.isCompleted())
         .firstInstant();
 
-    // Oldest commit to retain is the greatest completed commit, that is less 
than the oldest pending instant.
-    // In some cases when inflight is the lowest commit then oldest commit to 
retain will be equal to oldest
-    // inflight commit.
-    Option<HoodieInstant> oldestCommitToRetain;
-    if (oldestPendingInstant.isPresent()) {
-      Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
-          Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
-              .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
-                  LESSER_THAN, 
oldestPendingInstant.get().getTimestamp())).findFirst());
-      // Check if the completed instant is higher than the oldest inflight 
instant
-      // in that case update the oldestCommitToRetain to oldestInflight commit 
time.
-      if (!completedCommitBeforeOldestPendingInstant.isPresent()) {
-        oldestCommitToRetain = oldestPendingInstant;
+    Option<HoodieInstant> earliestCommitToRetain;
+    if (earliestPendingInstant.isPresent()) {
+      Option<HoodieInstant> completedCommitBeforeEarliestPendingInstant = 
Option.fromJavaOptional(completedCommitsTimeline
+          .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
earliestPendingInstant.get().getTimestamp()))
+          .getReverseOrderedInstants().findFirst());
+      // Check if the completed instant is higher than the earliest inflight 
instant
+      // in that case update the earliestCommitToRetain to earliestInflight 
commit time.
+      if (!completedCommitBeforeEarliestPendingInstant.isPresent()) {
+        earliestCommitToRetain = earliestPendingInstant;
       } else {
-        oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
+        earliestCommitToRetain = completedCommitBeforeEarliestPendingInstant;
       }
     } else {
-      oldestCommitToRetain = Option.empty();
+      earliestCommitToRetain = Option.empty();
     }
-
-    // NOTE: We cannot have any holes in the commit timeline.
-    // We cannot archive any commits which are made after the first savepoint 
present,
-    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
-    Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
-    Set<String> savepointTimestamps = table.getSavepointTimestamps();
-    if (!commitTimeline.empty() && commitTimeline.countInstants() > 
maxInstantsToKeep) {
-      // For Merge-On-Read table, inline or async compaction is enabled
-      // We need to make sure that there are enough delta commits in the 
active timeline
-      // to trigger compaction scheduling, when the trigger strategy of 
compaction is
-      // NUM_COMMITS or NUM_AND_TIME.
-      Option<HoodieInstant> oldestInstantToRetainForCompaction =
-          (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ
-              && (config.getInlineCompactTriggerStrategy() == 
CompactionTriggerStrategy.NUM_COMMITS
-              || config.getInlineCompactTriggerStrategy() == 
CompactionTriggerStrategy.NUM_AND_TIME))
-              ? CompactionUtils.getOldestInstantToRetainForCompaction(
-              table.getActiveTimeline(), 
config.getInlineCompactDeltaCommitMax())
-              : Option.empty();
-
-      // The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
-      // without the replaced files metadata on the timeline, the fs view 
would expose duplicates for readers.
-      // Meanwhile, when inline or async clustering is enabled, we need to 
ensure that there is a commit in the active timeline
-      // to check whether the file slice generated in pending clustering after 
archive isn't committed.
-      Option<HoodieInstant> oldestInstantToRetainForClustering =
-          
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
-
-      // Actually do the commits
-      Stream<HoodieInstant> instantToArchiveStream = 
commitTimeline.getInstantsAsStream()
-          .filter(s -> {
-            if (config.shouldArchiveBeyondSavepoint()) {
-              // skip savepoint commits and proceed further
-              return !savepointTimestamps.contains(s.getTimestamp());
-            } else {
-              // if no savepoint present, then don't filter
-              // stop at first savepoint commit
-              return !(firstSavepoint.isPresent() && 
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, 
s.getTimestamp()));
-            }
-          }).filter(s -> {
-            // oldestCommitToRetain is the highest completed commit instant 
that is less than the oldest inflight instant.
-            // By filtering out any commit >= oldestCommitToRetain, we can 
ensure there are no gaps in the timeline
-            // when inflight commits are present.
-            return oldestCommitToRetain
-                .map(instant -> compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
-                .orElse(true);
-          }).filter(s ->
-              oldestInstantToRetainForCompaction.map(instantToRetain ->
-                      compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantToRetain.getTimestamp()))
-                  .orElse(true)
-          ).filter(s ->
-              oldestInstantToRetainForClustering.map(instantToRetain ->
-                      HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
-                  .orElse(true)
-          );
-      return instantToArchiveStream.limit(commitTimeline.countInstants() - 
minInstantsToKeep);
-    } else {
-      return Stream.empty();
-    }
-  }
-
-  private Stream<ActiveAction> getInstantsToArchive() throws IOException {
-    if (config.isMetaserverEnabled()) {
-      return Stream.empty();
-    }
-
-    // For archiving and cleaning instants, we need to include intermediate 
state files if they exist
-    HoodieActiveTimeline rawActiveTimeline = new 
HoodieActiveTimeline(metaClient, false);
-    Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = 
rawActiveTimeline.getInstantsAsStream()
-            .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
-                    HoodieInstant.getComparableAction(i.getAction()))));
-
-    Stream<HoodieInstant> instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
-
-    // If metadata table is enabled, do not archive instants which are more 
recent than the last compaction on the
+    earliestInstantToRetainCandidates.add(earliestCommitToRetain);
+
+    // 2. For Merge-On-Read table, inline or async compaction is enabled
+    // We need to make sure that there are enough delta commits in the active 
timeline
+    // to trigger compaction scheduling, when the trigger strategy of 
compaction is
+    // NUM_COMMITS or NUM_AND_TIME.
+    Option<HoodieInstant> earliestInstantToRetainForCompaction =
+        (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ
+            && (config.getInlineCompactTriggerStrategy() == 
CompactionTriggerStrategy.NUM_COMMITS
+            || config.getInlineCompactTriggerStrategy() == 
CompactionTriggerStrategy.NUM_AND_TIME))
+            ? CompactionUtils.getEarliestInstantToRetainForCompaction(
+            table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
+            : Option.empty();
+    
earliestInstantToRetainCandidates.add(earliestInstantToRetainForCompaction);
+
+    // 3. The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
+    // without the replaced files metadata on the timeline, the fs view would 
expose duplicates for readers.
+    // Meanwhile, when inline or async clustering is enabled, we need to 
ensure that there is a commit in the active timeline
+    // to check whether the file slice generated in pending clustering after 
archive isn't committed.
+    Option<HoodieInstant> earliestInstantToRetainForClustering =
+        
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+    
earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);
+
+    // 4. If metadata table is enabled, do not archive instants which are more 
recent than the last compaction on the
     // metadata table.
     if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
       try (HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), 
config.getBasePath())) {
         Option<String> latestCompactionTime = 
tableMetadata.getLatestCompactionTime();
         if (!latestCompactionTime.isPresent()) {
           LOG.info("Not archiving as there is no compaction yet on the 
metadata table");
-          instants = Stream.empty();
+          return Collections.emptyList();
         } else {
           LOG.info("Limiting archiving of instants to latest compaction on 
metadata table at " + latestCompactionTime.get());
-          instants = instants.filter(instant -> 
compareTimestamps(instant.getTimestamp(), LESSER_THAN,
-              latestCompactionTime.get()));
+          earliestInstantToRetainCandidates.add(Option.of(new HoodieInstant(
+              HoodieInstant.State.COMPLETED, COMPACTION_ACTION, 
latestCompactionTime.get())));
         }
       } catch (Exception e) {
         throw new HoodieException("Error limiting instant archival based on 
metadata table", e);
       }
     }
 
+    // 5. If this is a metadata table, do not archive the commits that live in 
data set
+    // active timeline. This is required by metadata table,
+    // see HoodieTableMetadataUtil#processRollbackMetadata for details.
     if (table.isMetadataTable()) {
       HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
           
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
@@ -282,16 +244,62 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
       // CLEAN or ROLLBACK instant can block the archive of metadata table 
timeline and causes
       // the active timeline of metadata table to be extremely long, leading 
to performance issues
       // for loading the timeline.
-      if (qualifiedEarliestInstant.isPresent()) {
-        instants = instants.filter(instant ->
-            compareTimestamps(
-                instant.getTimestamp(),
-                HoodieTimeline.LESSER_THAN,
-                qualifiedEarliestInstant.get().getTimestamp()));
-      }
+      earliestInstantToRetainCandidates.add(qualifiedEarliestInstant);
     }
 
-    return instants.map(hoodieInstant -> {
+    // Choose the instant in earliestInstantToRetainCandidates with the 
smallest
+    // timestamp as earliestInstantToRetain.
+    java.util.Optional<HoodieInstant> earliestInstantToRetain = 
earliestInstantToRetainCandidates
+        .stream()
+        .filter(Option::isPresent)
+        .map(Option::get)
+        .min(HoodieInstant.COMPARATOR);
+
+    // Step2: We cannot archive any commits which are made after the first 
savepoint present,
+    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+    Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
+    Set<String> savepointTimestamps = table.getSavepointTimestamps();
+
+    Stream<HoodieInstant> instantToArchiveStream = 
completedCommitsTimeline.getInstantsAsStream()
+        .filter(s -> {
+          if (config.shouldArchiveBeyondSavepoint()) {
+            // skip savepoint commits and proceed further
+            return !savepointTimestamps.contains(s.getTimestamp());
+          } else {
+            // if no savepoint present, then don't filter
+            // stop at first savepoint commit
+            return !firstSavepoint.isPresent() || 
compareTimestamps(s.getTimestamp(), LESSER_THAN, 
firstSavepoint.get().getTimestamp());
+          }
+        }).filter(s -> earliestInstantToRetain
+            .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instant.getTimestamp()))
+            .orElse(true));
+    return 
instantToArchiveStream.limit(completedCommitsTimeline.countInstants() - 
minInstantsToKeep)
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ActiveAction> getInstantsToArchive() throws IOException {
+    if (config.isMetaserverEnabled()) {
+      return Stream.empty();
+    }
+
+    // First get commit instants to archive.
+    List<HoodieInstant> instantsToArchive = getCommitInstantsToArchive();
+    if (!instantsToArchive.isEmpty()) {
+      HoodieInstant latestCommitInstantToArchive = 
instantsToArchive.get(instantsToArchive.size() - 1);
+      // Then get clean and rollback instants to archive.
+      List<HoodieInstant> cleanAndRollbackInstantsToArchive =
+          getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive);
+      instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);
+      instantsToArchive.sort(HoodieInstant.COMPARATOR);
+    }
+
+    // For archive, we need to include instant's all states.
+    HoodieActiveTimeline rawActiveTimeline = new 
HoodieActiveTimeline(metaClient, false);
+    Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = 
rawActiveTimeline.getInstantsAsStream()
+        .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
+            HoodieInstant.getComparableAction(i.getAction()))));
+
+    return instantsToArchive.stream().map(hoodieInstant -> {
       List<HoodieInstant> instantsToStream = 
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
           HoodieInstant.getComparableAction(hoodieInstant.getAction())));
       return ActiveAction.fromInstants(instantsToStream);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 86070844701..112034fd877 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -506,7 +506,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
   }
 
   /**
-   * Returns earliest commit to retain based on cleaning policy.
+   * Returns the earliest commit to retain based on cleaning policy.
    */
   public Option<HoodieInstant> getEarliestCommitToRetain() {
     return CleanerUtils.getEarliestCommitToRetain(
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 0000c171839..724239a7738 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -832,12 +832,12 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         // only time when archival will kick in
         List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
         
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000003")));
-        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000004"), HoodieTimeline.ROLLBACK_ACTION));
+        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000002"),
 HoodieTimeline.ROLLBACK_ACTION));
         List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
         expectedActiveInstants.addAll(getActiveCommitInstants(
             Arrays.asList("00000005", "00000007", "00000009", "00000011")));
         expectedActiveInstants.addAll(getActiveCommitInstants(
-            Arrays.asList("00000006", "00000008", "00000010", "00000012"), 
HoodieTimeline.ROLLBACK_ACTION));
+            Arrays.asList("00000004", "00000006", "00000008", "00000010", 
"00000012"), HoodieTimeline.ROLLBACK_ACTION));
         verifyArchival(expectedArchivedInstants, expectedActiveInstants, 
commitsAfterArchival);
       }
     }
@@ -977,25 +977,21 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws 
Exception {
-    HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 2);
-
-    // min archival commits is 4 and max archival commits is 5
-    // (either clean commits has to be > 5 or commits has to be greater than 5)
-    // and so, after 6th instant, 2 instants will be archived.
-    // 1,2,3,4,5,6 : after archival -> 1,3,4,5,6
-    // (because, 2,3,4,5 and 6 are clean instants and are eligible for 
archival)
-    // after 7th and 9th instant no-op wrt archival.  After 8th instant,
-    // archival kicks in when metadata table is enabled.
+    HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 8);
+    // Min archival commits is 2 and max archival commits is 4.
+    // When metadata table is not enabled, after 5th write instant, archive 
will be triggered.
+    // When metadata table is enabled, after 8th instant (6 write instants + 2 
clean instants) >= maxDeltaCommitsMetadataTable,
+    // archival kicks in when compaction in metadata table triggered.
     Map<String, Integer> cleanStats = new HashMap<>();
     cleanStats.put("p1", 1);
     cleanStats.put("p2", 2);
-    for (int i = 1; i <= 10; i++) {
+    for (int i = 1; i <= 8; i++) {
       if (i == 1) {
-        testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i 
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", 
"p2"), 20);
-      } else if (i <= 7 || i == 9) {
-        testTable.doClean("0000000" + i, cleanStats);
+        testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", 
"p2"), 20);
+      } else if (i <= 3) {
+        testTable.doClean(String.format("%08d", i), cleanStats);
       } else {
-        testTable.doWriteOperation("000000" + String.format("%02d", i), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : 
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+        testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 
2);
       }
       // trigger archival
       Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
@@ -1005,37 +1001,34 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         assertEquals(originalCommits, commitsAfterArchival);
       } else if (i == 7) {
         if (!enableMetadata) {
-          // 1,2,3,4,5,6,7 : after archival -> 1,4,5,6,7 (bcoz, 2,3,4,5,6,7 
are clean instants and are eligible for archival)
-          List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-          
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001")));
-          expectedActiveInstants.addAll(
-              getActiveCommitInstants(Arrays.asList("00000004", "00000005", 
"00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
-          verifyArchival(getAllArchivedCommitInstants(
-              Arrays.asList("00000002", "00000003"), 
HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival);
+          // do archive:
+          // clean: 2,3: after archival -> null
+          // write: 1,4,5,6,7: after archival -> 6, 7
+          List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000006", "00000007")));
+
+          List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000005")));
+          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003"), HoodieTimeline.CLEAN_ACTION));
+
+          verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
         } else {
-          // with metadata enabled, archival in data table is fenced based on 
compaction in metadata table. Clean commits in data table will not trigger 
compaction in
-          // metadata table.
-          List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-          
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001")));
-          expectedActiveInstants.addAll(getActiveCommitInstants(
-              Arrays.asList("00000002", "00000003", "00000004", "00000005", 
"00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
-          verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(), 
HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival);
+          // with metadata enabled, archival in data table is fenced based on 
compaction in metadata table.
+          assertEquals(originalCommits, commitsAfterArchival);
         }
       } else {
         if (!enableMetadata) {
           assertEquals(originalCommits, commitsAfterArchival);
         } else {
-          if (i == 8) {
-            // when i == 7 compaction in metadata table will be triggered
-            // and after wards archival in datatable will kick in when i == 8.
-            // 1,2,3,4,5,6,7,8 : after archival -> 1,4,5,6,7,8 (bcoz, 2,3,4,5 
and 6 are clean commits and are eligible for archival)
-            List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-            
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", 
"00000008")));
-            
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000004", 
"00000005", "00000006", "00000007"), HoodieTimeline.CLEAN_ACTION));
-            
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", 
"00000003"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, 
commitsAfterArchival);
-          } else {
-            assertEquals(originalCommits, commitsAfterArchival);
-          }
+          // when i == 8 compaction in metadata table will be triggered, and 
then allow archive:
+          // clean: 2,3: after archival -> null
+          // write: 1,4,5,6,7,8: after archival -> 7, 8
+          List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
+
+          List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000005", "00000006")));
+          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003"), HoodieTimeline.CLEAN_ACTION));
+
+          verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
         }
       }
     }
@@ -1043,40 +1036,43 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
 
   @Test
   public void testArchiveRollbacksAndCleanTestTable() throws Exception {
-    int minArchiveCommits = 4;
-    int maxArchiveCommits = 9;
+    int minArchiveCommits = 2;
+    int maxArchiveCommits = 4;
     HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 
minArchiveCommits, maxArchiveCommits, 2);
 
-    // trigger 1 commit to add lot of files so that future cleans can clean 
them up
-    testTable.doWriteOperation("00000001", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20);
+    // trigger 1 commit to add a lot of files so that future cleans can clean 
them up
+    testTable.doWriteOperation(String.format("%08d", 1), 
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", 
"p2"), 20);
 
     Map<String, Integer> partitionToFileDeleteCount = new HashMap<>();
     partitionToFileDeleteCount.put("p1", 1);
     partitionToFileDeleteCount.put("p2", 1);
-    // we are triggering 10 clean commits. (1 is commit, 2 -> 11 is clean)
-    for (int i = 2; i <= (maxArchiveCommits + 2); i++) {
-      testTable.doClean((i > 9 ? ("000000") : ("0000000")) + i, 
partitionToFileDeleteCount);
+
+    for (int i = 2; i < 5; i++) {
+      testTable.doClean(String.format("%08d", i), partitionToFileDeleteCount);
     }
 
-    // we are triggering 7 commits and 7 rollbacks for the same
-    for (int i = 12; i <= (2 * maxArchiveCommits); i += 2) {
-      testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, 
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
-      testTable.doRollback("000000" + i, "000000" + (i + 1));
+    for (int i = 5; i <= 11; i += 2) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 
2);
+      testTable.doRollback(String.format("%08d", i), String.format("%08d", i + 
1));
     }
 
-    // trigger archival
+    // trigger archival:
+    // clean: 2,3: after archival -> null
+    // write: 1,5,7,9,11: after archival -> 9,11
+    // rollback: 6,8,10,12: after archival -> 8,10,12
     Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
-    List<HoodieInstant> originalCommits = commitsList.getKey();
     List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
 
-    // out of 10 clean commits, 6 will be archived. 2 to 7. 8 to 11 will be 
active.
-    // wrt regular commits, there aren't 9 commits yet and so all of them will 
be active.
     List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
-    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009", "00000010", "00000011"), HoodieTimeline.CLEAN_ACTION));
-    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", 
"00000012", "00000014", "00000016", "00000018")));
-    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000013", 
"00000015", "00000017", "00000019"), HoodieTimeline.ROLLBACK_ACTION));
-    verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", 
"00000003", "00000004", "00000005", "00000006", "00000007"),
-        HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, 
commitsAfterArchival);
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000010", "00000012"), HoodieTimeline.ROLLBACK_ACTION));
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009", 
"00000011")));
+
+    List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+    
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000005", "00000007")));
+    
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION));
+    
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000006"),
 HoodieTimeline.ROLLBACK_ACTION));
+
+    verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
   }
 
   @ParameterizedTest
@@ -1098,13 +1094,13 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     int startInstant = 1;
     List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
-      createCleanMetadata(startInstant + "", false, false, isEmpty || i % 2 == 
0);
-      expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.CLEAN_ACTION, startInstant + ""));
+      createCleanMetadata(String.format("%02d", startInstant), false, false, 
isEmpty || i % 2 == 0);
+      expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.CLEAN_ACTION, String.format("%02d", startInstant)));
     }
 
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
       createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", 
false, isEmpty || i % 2 == 0);
-      expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.ROLLBACK_ACTION, startInstant + ""));
+      expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.ROLLBACK_ACTION, String.format("%02d", startInstant)));
     }
 
     if (enableMetadataTable) {
@@ -1121,8 +1117,11 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     Stream<HoodieInstant> currentInstants = 
metaClient.getActiveTimeline().reload().getInstantsAsStream();
     Map<Object, List<HoodieInstant>> actionInstantMap = 
currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
 
-    assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must 
be preset");
-    assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), 
"Should have min instant");
+    // The commit order is: clean, clean, clean, ..., commit, rollback, 
commit, rollback ...
+    // So after archive, actionInstantMap will contain commit and rollback,
+    // the number will be equal to minInstantsToKeep
+    assertTrue(actionInstantMap.containsKey("commit"), "Commit Action key must 
be preset");
+    assertEquals(minInstantsToKeep, actionInstantMap.get("commit").size(), 
"Should have min instant");
 
     assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key 
must be preset");
     assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), 
"Should have min instant");
@@ -1136,43 +1135,6 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     expectedArchivedInstants.forEach(entry -> 
assertTrue(metaClient.getArchivedTimeline().containsInstant(entry)));
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testArchiveInflightClean(boolean enableMetadataTable) throws 
Exception {
-    init();
-    HoodieWriteConfig cfg =
-        
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
-            .withParallelism(2, 2).forTable("test-trip-table")
-            
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
-            
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
-            .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-                .withRemoteServerPort(timelineServicePort).build())
-            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
-            .build();
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-
-    createCleanMetadata("10", false);
-    createCleanMetadata("11", false);
-    HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false);
-    HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false);
-    HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true);
-
-    if (enableMetadataTable) {
-      // Simulate a compaction commit in metadata table timeline
-      // so the archival in data table can happen
-      createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, 
"14");
-    }
-
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
-
-    archiver.archiveIfRequired(context);
-
-    List<HoodieInstant> notArchivedInstants = 
metaClient.getActiveTimeline().reload().getInstants();
-    assertEquals(3, notArchivedInstants.size(), "Not archived instants should 
be 3");
-    assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2, notArchivedInstant3), "");
-  }
-
   @Test
   public void testArchiveTableWithMetadataTableCompaction() throws Exception {
     HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 
7);
@@ -1637,14 +1599,14 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
   }
 
   private void verifyArchival(List<HoodieInstant> expectedArchivedInstants, 
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant> 
commitsAfterArchival) {
-    Collections.sort(expectedActiveInstants, 
Comparator.comparing(HoodieInstant::getTimestamp));
-    Collections.sort(commitsAfterArchival, 
Comparator.comparing(HoodieInstant::getTimestamp));
+    
expectedActiveInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
+    
commitsAfterArchival.sort(Comparator.comparing(HoodieInstant::getTimestamp));
     assertEquals(expectedActiveInstants, commitsAfterArchival);
     expectedArchivedInstants.forEach(entry -> 
assertFalse(commitsAfterArchival.contains(entry)));
     HoodieArchivedTimeline archivedTimeline = new 
HoodieArchivedTimeline(metaClient);
     List<HoodieInstant> actualArchivedInstants = 
archivedTimeline.getInstants();
-    Collections.sort(actualArchivedInstants, 
Comparator.comparing(HoodieInstant::getTimestamp));
-    Collections.sort(expectedArchivedInstants, 
Comparator.comparing(HoodieInstant::getTimestamp));
+    
actualArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
+    
expectedArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
     assertEquals(actualArchivedInstants, expectedArchivedInstants);
 
     HoodieTimeline timeline = metaClient.getActiveTimeline();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index e50431c7398..894db11a2d1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -236,14 +236,14 @@ public class ClusteringUtils {
   }
 
   /**
-   * Returns the oldest instant to retain.
-   * Make sure the clustering instant won't be archived before cleaned, and 
the oldest inflight clustering instant has a previous commit.
+   * Returns the earliest instant to retain.
+   * Make sure the clustering instant won't be archived before cleaned, and 
the earliest inflight clustering instant has a previous commit.
    *
    * @param activeTimeline The active timeline
    * @param metaClient     The meta client
-   * @return the oldest instant to retain for clustering
+   * @return the earliest instant to retain for clustering
    */
-  public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+  public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(
       HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) 
throws IOException {
     Option<HoodieInstant> oldestInstantToRetain = Option.empty();
     HoodieTimeline replaceTimeline = 
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 0f41f1314e1..6b74dd869a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -339,7 +339,7 @@ public class CompactionUtils {
   }
 
   /**
-   * Gets the oldest instant to retain for MOR compaction.
+   * Gets the earliest instant to retain for MOR compaction.
    * If there is no completed compaction,
    * num delta commits >= "hoodie.compact.inline.max.delta.commits"
    * If there is a completed compaction,
@@ -348,9 +348,9 @@ public class CompactionUtils {
    * @param activeTimeline  Active timeline of a table.
    * @param maxDeltaCommits Maximum number of delta commits that trigger the 
compaction plan,
    *                        i.e., "hoodie.compact.inline.max.delta.commits".
-   * @return the oldest instant to keep for MOR compaction.
+   * @return the earliest instant to keep for MOR compaction.
    */
-  public static Option<HoodieInstant> getOldestInstantToRetainForCompaction(
+  public static Option<HoodieInstant> getEarliestInstantToRetainForCompaction(
       HoodieActiveTimeline activeTimeline, int maxDeltaCommits) {
     Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfoOption =
         CompactionUtils.getDeltaCommitsSinceLatestCompaction(activeTimeline);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 4e76d25f41f..9028fe63fd4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -146,7 +146,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     HoodieInstant inflightInstant3 = 
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3,
 Option.empty());
     HoodieInstant completedInstant3 = 
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3,
 Option.empty());
     metaClient.reloadActiveTimeline();
-    Option<HoodieInstant> actual = 
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
     assertTrue(actual.isPresent());
     assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in 
timeline, retain first replace commit");
 
@@ -168,7 +168,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
     metaClient.reloadActiveTimeline();
-    actual = 
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
     assertEquals(clusterTime3, actual.get().getTimestamp(),
         "retain the first replace commit after the earliestInstantToRetain ");
   }
@@ -206,7 +206,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3,
 Option.empty());
     metaClient.reloadActiveTimeline();
 
-    Option<HoodieInstant> actual = 
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
     assertEquals(clusterTime2, actual.get().getTimestamp(),
         "retain the first replace commit after the last complete clean ");
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index b7855bec767..7347e2ab696 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -285,7 +285,7 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
   @ValueSource(booleans = {true, false})
   public void testGetOldestInstantToKeepForCompaction(boolean 
hasCompletedCompaction) {
     HoodieActiveTimeline timeline = prepareTimeline(hasCompletedCompaction);
-    Option<HoodieInstant> actual = 
CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20);
+    Option<HoodieInstant> actual = 
CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20);
 
     if (hasCompletedCompaction) {
       assertEquals(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, 
"06"), actual.get());
@@ -293,17 +293,17 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
       assertEquals(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"), actual.get());
     }
 
-    actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 
3);
+    actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 
3);
     assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"07"), actual.get());
 
-    actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 
2);
+    actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 
2);
     assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"08"), actual.get());
   }
 
   @Test
   public void testGetOldestInstantToKeepForCompactionWithEmptyDeltaCommits() {
     HoodieActiveTimeline timeline = new MockHoodieActiveTimeline();
-    assertEquals(Option.empty(), 
CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20));
+    assertEquals(Option.empty(), 
CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20));
   }
 
   private HoodieActiveTimeline prepareTimeline(boolean hasCompletedCompaction) 
{


Reply via email to