This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf5ec3503a6 [HUDI-7779] Guard archival on savepoint removal until 
cleaner is able to clean it up (#11440)
bf5ec3503a6 is described below

commit bf5ec3503a6bfb0cc9b8352d290cee60fb3a84ac
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jul 19 16:09:25 2024 +0530

    [HUDI-7779] Guard archival on savepoint removal until cleaner is able to 
clean it up (#11440)
---
 .../client/timeline/HoodieTimelineArchiver.java    |   3 +-
 .../action/clean/CleanPlanActionExecutor.java      |   6 +-
 .../hudi/table/action/clean/CleanPlanner.java      |  31 ++-
 .../apache/hudi/table/action/TestCleanPlanner.java | 169 ++++++++++----
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 249 +++++++++++++++++----
 .../table/timeline/HoodieDefaultTimeline.java      |   6 +
 .../hudi/common/table/timeline/HoodieTimeline.java |  31 ++-
 .../org/apache/hudi/common/util/CleanerUtils.java  |   2 +-
 .../apache/hudi/common/util/ClusteringUtils.java   |  67 ++++--
 .../table/timeline/TestHoodieActiveTimeline.java   |  11 +
 .../hudi/common/testutils/HoodieTestTable.java     |  11 +-
 .../hudi/common/util/TestClusteringUtils.java      |   6 +-
 12 files changed, 462 insertions(+), 130 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index 817c3f650d9..dd091386d26 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -208,7 +208,8 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     // Meanwhile, when inline or async clustering is enabled, we need to 
ensure that there is a commit in the active timeline
     // to check whether the file slice generated in pending clustering after 
archive isn't committed.
     Option<HoodieInstant> earliestInstantToRetainForClustering =
-        
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+        
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient(),
+            config.getCleanerPolicy());
     
earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);
 
     // 4. If metadata table is enabled, do not archive instants which are more 
recent than the last compaction on the
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 0329fc8ddc6..613ed979367 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
 import static org.apache.hudi.common.util.MapUtils.nonEmpty;
-import static 
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
+import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
 
 public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, 
I, K, O, Option<HoodieCleanerPlan>> {
 
@@ -159,7 +159,9 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
     if (savepointedTimestamps.isEmpty()) {
       return Collections.emptyMap();
     } else {
-      return Collections.singletonMap(SAVEPOINTED_TIMESTAMPS, 
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+      Map<String, String> extraMetadata = new HashMap<>();
+      extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+      return extraMetadata;
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index fff86ab8659..57546252cad 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -78,7 +78,6 @@ public class CleanPlanner<T, I, K, O> implements Serializable 
{
   public static final Integer CLEAN_PLAN_VERSION_1 = 
CleanPlanV1MigrationHandler.VERSION;
   public static final Integer CLEAN_PLAN_VERSION_2 = 
CleanPlanV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
-  public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
 
   private final SyncableFileSystemView fileSystemView;
   private final HoodieTimeline commitTimeline;
@@ -88,6 +87,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable 
{
   private final HoodieWriteConfig config;
   private transient HoodieEngineContext context;
   private List<String> savepointedTimestamps;
+  private Option<HoodieInstant> earliestCommitToRetain = Option.empty();
 
   public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> 
hoodieTable, HoodieWriteConfig config) {
     this.context = context;
@@ -126,11 +126,6 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     return metadata.getPartitionMetadata().values().stream().flatMap(s -> 
s.getSavepointDataFile().stream());
   }
 
-  private Stream<String> getPartitionsFromSavepoint(String savepointTime) {
-    HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
-    return metadata.getPartitionMetadata().keySet().stream();
-  }
-
   private HoodieSavepointMetadata getSavepointMetadata(String 
savepointTimestamp) {
     if (!hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
       throw new HoodieSavepointException(
@@ -203,8 +198,9 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
    */
   private List<String> 
getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
       Option<HoodieInstant> newInstantToRetain) {
-    boolean isSavepointDeleted = isAnySavepointDeleted(cleanMetadata);
-    if (isSavepointDeleted) {
+
+    boolean isAnySavepointDeleted = isAnySavepointDeleted(cleanMetadata);
+    if (isAnySavepointDeleted) {
       LOG.info("Since savepoints have been removed compared to previous clean, 
triggering clean planning for all partitions");
       return getPartitionPathsForFullCleaning();
     } else {
@@ -222,7 +218,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
 
   private boolean isAnySavepointDeleted(HoodieCleanMetadata cleanMetadata) {
     List<String> savepointedTimestampsFromLastClean = 
cleanMetadata.getExtraMetadata() == null ? Collections.emptyList()
-        : 
Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault(SAVEPOINTED_TIMESTAMPS,
 StringUtils.EMPTY_STRING).split(","))
+        : 
Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault(CleanerUtils.SAVEPOINTED_TIMESTAMPS,
 StringUtils.EMPTY_STRING).split(","))
         .filter(partition -> 
!StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
     if (savepointedTimestampsFromLastClean.isEmpty()) {
       return false;
@@ -553,13 +549,16 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
    * Returns the earliest commit to retain based on cleaning policy.
    */
   public Option<HoodieInstant> getEarliestCommitToRetain() {
-    return CleanerUtils.getEarliestCommitToRetain(
-        
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
-        config.getCleanerPolicy(),
-        config.getCleanerCommitsRetained(),
-        Instant.now(),
-        config.getCleanerHoursRetained(),
-        hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
+    if (!earliestCommitToRetain.isPresent()) {
+      earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
+          
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
+          config.getCleanerPolicy(),
+          config.getCleanerCommitsRetained(),
+          Instant.now(),
+          config.getCleanerHoursRetained(),
+          hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
+    }
+    return earliestCommitToRetain;
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index 179454e2063..c4ffa8c1bac 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -19,8 +19,10 @@
 
 package org.apache.hudi.table.action;
 
+import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -40,6 +42,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -70,10 +73,12 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
 import static 
org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
-import static 
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
+import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -132,17 +137,23 @@ public class TestCleanPlanner {
     
when(mockFsView.getAllFileGroupsStateless(partitionPath)).thenReturn(allFileGroups.stream());
 
     CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context, 
mockHoodieTable, config);
-    HoodieInstant earliestCommitToRetain = new 
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+    HoodieInstant earliestCommitToRetain = new HoodieInstant(COMPLETED, 
"COMMIT", earliestInstant);
     Pair<Boolean, List<CleanFileInfo>> actual = 
cleanPlanner.getDeletePaths(partitionPath, Option.of(earliestCommitToRetain));
     assertEquals(expected, actual);
   }
 
+  /**
+   * The test asserts clean planner results for APIs 
org.apache.hudi.table.action.clean.CleanPlanner#getEarliestSavepoint()
+   * and 
org.apache.hudi.table.action.clean.CleanPlanner#getPartitionPathsToClean(org.apache.hudi.common.util.Option)
+   * given the other input arguments for clean metadata. The idea is the two 
API results should match the expected results.
+   */
   @ParameterizedTest
   @MethodSource("incrCleaningPartitionsTestCases")
   void testPartitionsForIncrCleaning(boolean isPartitioned, HoodieWriteConfig 
config, String earliestInstant,
                                      String lastCompletedTimeInLastClean, 
String lastCleanInstant, String earliestInstantsInLastClean, List<String> 
partitionsInLastClean,
-                                     Map<String, List<String>> 
savepointsTrackedInLastClean, Map<String, List<String>> 
activeInstantsPartitions,
-                                     Map<String, List<String>> savepoints, 
List<String> expectedPartitions, boolean areCommitsForSavepointsRemoved) throws 
IOException {
+                                     Map<String, List<String>> 
savepointsTrackedInLastClean, Option<String> 
expectedEarliestSavepointInLastClean,
+                                     Map<String, List<String>> 
activeInstantsPartitions, List<String> replaceCommits, List<String> 
expectedPartitions, boolean areCommitsForSavepointsRemoved,
+                                     Map<String, List<String>> savepoints) 
throws IOException, IllegalAccessException {
     HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
     when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
     // setup savepoint mocks
@@ -158,9 +169,10 @@ public class TestCleanPlanner {
 
     // prepare last Clean Metadata
     Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadataOptionPair =
-        getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, 
earliestInstantsInLastClean, lastCompletedTimeInLastClean, 
savepointsTrackedInLastClean.keySet());
-    mockLastCleanCommit(mockHoodieTable, lastCleanInstant, 
earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
-    mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, 
savepointsTrackedInLastClean, areCommitsForSavepointsRemoved);
+        getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, 
earliestInstantsInLastClean, lastCompletedTimeInLastClean,
+            savepointsTrackedInLastClean.keySet(), 
expectedEarliestSavepointInLastClean);
+    HoodieCleanerPlan cleanerPlan = mockLastCleanCommit(mockHoodieTable, 
lastCleanInstant, earliestInstantsInLastClean, activeTimeline, 
cleanMetadataOptionPair, savepointsTrackedInLastClean.keySet());
+    mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, 
savepointsTrackedInLastClean, areCommitsForSavepointsRemoved, replaceCommits);
 
     // mock getAllPartitions
     HoodieStorage storage = mock(HoodieStorage.class);
@@ -169,10 +181,14 @@ public class TestCleanPlanner {
     when(mockHoodieTable.getMetadataTable()).thenReturn(hoodieTableMetadata);
     when(hoodieTableMetadata.getAllPartitionPaths()).thenReturn(isPartitioned 
? Arrays.asList(PARTITION1, PARTITION2, PARTITION3) : 
Collections.singletonList(StringUtils.EMPTY_STRING));
 
-    // Trigger clean and validate partitions to clean.
+    // Trigger clean and validate partitions to clean and earliest savepoint
     CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context, 
mockHoodieTable, config);
-    HoodieInstant earliestCommitToRetain = new 
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+    HoodieInstant earliestCommitToRetain = new HoodieInstant(COMPLETED, 
"COMMIT", earliestInstant);
     List<String> partitionsToClean = 
cleanPlanner.getPartitionPathsToClean(Option.of(earliestCommitToRetain));
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    when(metaClient.getActiveTimeline()).thenReturn(activeTimeline);
+    assertEquals(expectedEarliestSavepointInLastClean, 
ClusteringUtils.getEarliestReplacedSavepointInClean(activeTimeline, 
config.getCleanerPolicy(), cleanerPlan));
+
     Collections.sort(expectedPartitions);
     Collections.sort(partitionsToClean);
     assertEquals(expectedPartitions, partitionsToClean);
@@ -370,74 +386,113 @@ public class TestCleanPlanner {
 
     List<Arguments> arguments = new ArrayList<>();
 
+    // Code snippet below generates different test cases which will assert the 
clean planner output
+    // for these cases. There is no sequential dependency between these cases 
and they can be considered
+    // independent.
+    // In the test cases below earliestInstant signifies the new value of 
earliest instant to retain as computed
+    // by CleanPlanner. The test case provides current state of the table and 
compare the expected values for
+    // earliestSavepointInLastClean and expectedPartitions against computed 
values for the same by clean planner
+
     // no savepoints tracked in last clean and no additional savepoints. all 
partitions in uncleaned instants should be expected
+    // earliest savepoint in last clean is empty since there is no savepoint 
in the timeline
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
-        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(),
-        activeInstantsPartitionsMap3, Collections.emptyMap(), 
threePartitionsInActiveTimeline, false));
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(), Option.empty(),
+        activeInstantsPartitionsMap3, Collections.emptyList(), 
threePartitionsInActiveTimeline, false, Collections.emptyMap()));
+
+    // a new savepoint is added after last clean. but rest of uncleaned 
touches all partitions, and so all partitions are expected
+    // earliest savepoint in last clean is empty even though savepoint2 was 
added to the timeline
+    // since there is no replace commit after the savepoint and before 
earliestInstantInLastClean (earliestInstantToRetain)
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), Option.empty(), 
activeInstantsPartitionsMap3,
+        Collections.emptyList(), threePartitionsInActiveTimeline, false, 
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1))));
 
     // a new savepoint is added after last clean. but rest of uncleaned 
touches all partitions, and so all partitions are expected
+    // earliest savepoint in last clean is empty even though savepoint2 was 
added to the timeline. This is because there
+    // there are no replace commits between the savepoint and earliestInstant 
which is the earliestInstantToRetain
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
-        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(),
-        activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline, 
false));
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(), Option.empty(),
+        activeInstantsPartitionsMap3, Collections.emptyList(), 
threePartitionsInActiveTimeline, false,
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1))));
 
     // previous clean tracks a savepoint which exists in timeline still. only 
2 partitions are touched by uncleaned instants. only 2 partitions are expected
+    // earliest savepoint in last clean is empty even though savepoint2 was 
added to the timeline
+    // since there is no replace commit after the savepoint and before 
earliestInstantInLastClean (earliestInstantToRetain)
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
-        activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), Option.empty(),
+        activeInstantsPartitionsMap2, Collections.emptyList(), 
twoPartitionsInActiveTimeline, false,
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1))));
 
     // savepoint tracked in previous clean was removed(touching partition1). 
latest uncleaned touched 2 other partitions. But when savepoint is removed, 
entire list of partitions are expected.
+    // earliest savepoint in last clean is empty since there is no savepoint 
in the timeline
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
-        activeInstantsPartitionsMap2, Collections.emptyMap(), 
threePartitionsInActiveTimeline, false));
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), Option.empty(),
+        activeInstantsPartitionsMap2, Collections.emptyList(), 
threePartitionsInActiveTimeline, false, Collections.emptyMap()));
 
     // previous savepoint still exists and touches partition1. uncleaned 
touches only partition2 and partition3. expected partition2 and partition3.
+    // earliest savepoint in last clean is empty even though savepoint2 was 
added to the timeline
+    // since there is no replace commit after the savepoint and before 
earliestInstantInLastClean (earliestInstantToRetain)
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
-        activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), Option.empty(),
+        activeInstantsPartitionsMap2, 
Collections.singletonList(earliestInstantMinusThreeDays), 
twoPartitionsInActiveTimeline, false,
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1))));
 
     // a new savepoint was added compared to previous clean. all 2 partitions 
are expected since uncleaned commits touched just 2 partitions.
+    // earliest savepoint in last clean is same as savepoint3 since savepoint3 
is the oldest savepoint timestamp in the timeline
+    // and there is a replace commit earliestInstantMinusOneWeek after the 
savepoint but before earliestInstantInLastClean (earliestInstantToRetain)
     Map<String, List<String>> latestSavepoints = new HashMap<>();
-    latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
     latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1));
+    latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
-        activeInstantsPartitionsMap2, latestSavepoints, 
twoPartitionsInActiveTimeline, false));
+        Collections.singletonMap(savepoint3, 
Collections.singletonList(PARTITION1)), Option.of(savepoint3),
+        activeInstantsPartitionsMap2, 
Collections.singletonList(earliestInstantMinusOneWeek), 
twoPartitionsInActiveTimeline, false, latestSavepoints));
 
     // 2 savepoints were tracked in previous clean. one of them is removed in 
latest. When savepoint is removed, entire list of partitions are expected.
+    // earliest savepoint in last clean is same as savepoint3 since savepoint3 
is part of the timeline
+    // and there is a replace commit earliestInstantMinusOneWeek after the 
savepoint but before earliestInstantInLastClean (earliestInstantToRetain)
     Map<String, List<String>> previousSavepoints = new HashMap<>();
     previousSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
     previousSavepoints.put(savepoint3, Collections.singletonList(PARTITION2));
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        previousSavepoints, activeInstantsPartitionsMap2, 
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), 
threePartitionsInActiveTimeline, false));
+        previousSavepoints, Option.of(savepoint3), 
activeInstantsPartitionsMap2, 
Collections.singletonList(earliestInstantMinusOneWeek), 
threePartitionsInActiveTimeline, false,
+        Collections.singletonMap(savepoint3, 
Collections.singletonList(PARTITION2))
+    ));
 
     // 2 savepoints were tracked in previous clean. one of them is removed in 
latest. But a partition part of removed savepoint is already touched by 
uncleaned commits.
     // Anyways, when savepoint is removed, entire list of partitions are 
expected.
+    // earliest savepoint in last clean is empty even though savepoint3 is 
part of the timeline
+    // since there is no replace commit between the savepoint3 and 
earliestInstantInLastClean (earliestInstantToRetain)
+    // The replace commit earliestInstantMinusThreeDays is after the 
earliestInstantInLastClean
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        previousSavepoints, activeInstantsPartitionsMap3, 
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), 
threePartitionsInActiveTimeline, false));
+        previousSavepoints, Option.empty(), activeInstantsPartitionsMap3, 
Collections.singletonList(earliestInstantMinusThreeDays), 
threePartitionsInActiveTimeline, false,
+        Collections.singletonMap(savepoint3, 
Collections.singletonList(PARTITION2))
+    ));
 
     // unpartitioned test case. savepoint removed.
     List<String> unPartitionsInActiveTimeline = 
Arrays.asList(StringUtils.EMPTY_STRING);
     Map<String, List<String>> activeInstantsUnPartitionsMap = new HashMap<>();
     activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays, 
unPartitionsInActiveTimeline);
 
+    // earliest savepoint in last clean is empty since there is no savepoint 
in the timeline
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(false,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(StringUtils.EMPTY_STRING)),
-        activeInstantsUnPartitionsMap, Collections.emptyMap(), 
unPartitionsInActiveTimeline, false));
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(StringUtils.EMPTY_STRING)), Option.empty(),
+        activeInstantsUnPartitionsMap, Collections.emptyList(), 
unPartitionsInActiveTimeline, false, Collections.emptyMap()));
 
     // savepoint tracked in previous clean was removed(touching partition1). 
active instants does not have the instant corresponding to the savepoint.
     // latest uncleaned touched 2 other partitions. But when savepoint is 
removed, entire list of partitions are expected.
+    // earliest savepoint in last clean is empty since there is no savepoint 
in the timeline
     activeInstantsPartitionsMap2.remove(earliestInstantMinusOneWeek);
     
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
         earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
-        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
-        activeInstantsPartitionsMap2, Collections.emptyMap(), 
threePartitionsInActiveTimeline, true));
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), Option.empty(),
+        activeInstantsPartitionsMap2, Collections.emptyList(), 
threePartitionsInActiveTimeline, true, Collections.emptyMap()));
 
     return arguments.stream();
   }
@@ -467,21 +522,29 @@ public class TestCleanPlanner {
         Arguments.of(getCleanByCommitsConfig(), earliestInstant, 
allFileGroups, savepoints, replacedFileGroups, expected));
   }
 
-  // helper to build common cases for the two policies
+  /**
+   * The function generates test arguments for two cases. One where cleaning 
policy
+   * is set to KEEP_LATEST_BY_HOURS and the other where cleaning policy is set 
to
+   * KEEP_LATEST_COMMITS. Rest of the arguments are same.
+   */
   private static List<Arguments> 
buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(boolean 
isPartitioned, String earliestInstant,
                                                                                
                  String latestCompletedInLastClean,
                                                                                
                  String lastKnownCleanInstantTime,
                                                                                
                  String earliestInstantInLastClean,
                                                                                
                  List<String> partitionsInLastClean,
                                                                                
                  Map<String, List<String>> savepointsTrackedInLastClean,
+                                                                               
                  Option<String> expectedEarliestSavepointInLastClean,
                                                                                
                  Map<String, List<String>> activeInstantsToPartitionsMap,
-                                                                               
                  Map<String, List<String>> savepoints,
+                                                                               
                  List<String> replaceCommits,
                                                                                
                  List<String> expectedPartitions,
-                                                                               
                  boolean areCommitsForSavepointsRemoved) {
+                                                                               
                  boolean areCommitsForSavepointsRemoved,
+                                                                               
                  Map<String, List<String>> savepoints) {
     return Arrays.asList(Arguments.of(isPartitioned, getCleanByHoursConfig(), 
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
-            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, 
expectedPartitions, areCommitsForSavepointsRemoved),
+            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, expectedEarliestSavepointInLastClean,
+            activeInstantsToPartitionsMap, replaceCommits, expectedPartitions, 
areCommitsForSavepointsRemoved, savepoints),
         Arguments.of(isPartitioned, getCleanByCommitsConfig(), 
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
-            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, 
expectedPartitions, areCommitsForSavepointsRemoved));
+            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, expectedEarliestSavepointInLastClean,
+            activeInstantsToPartitionsMap, replaceCommits, expectedPartitions, 
areCommitsForSavepointsRemoved, savepoints));
   }
 
   private static HoodieFileGroup buildFileGroup(List<String> 
baseFileCommitTimes) {
@@ -492,7 +555,7 @@ public class TestCleanPlanner {
     String fileGroup = UUID.randomUUID() + "-0";
     HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, 
UUID.randomUUID().toString());
     HoodieTimeline timeline = mock(HoodieTimeline.class);
-    when(timeline.lastInstant()).thenReturn(Option.of(new 
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", 
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
+    when(timeline.lastInstant()).thenReturn(Option.of(new 
HoodieInstant(COMPLETED, "COMMIT", 
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
     HoodieFileGroup group = new HoodieFileGroup(fileGroupId, timeline);
     for (String baseFileCommitTime : baseFileCommitTimes) {
       
when(timeline.containsOrBeforeTimelineStarts(baseFileCommitTime)).thenReturn(true);
@@ -516,15 +579,13 @@ public class TestCleanPlanner {
   }
 
   private static Pair<HoodieCleanMetadata, Option<byte[]>> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
-                                                                               
   String lastCompletedTime, Set<String> savepointsToTrack) {
+                                                                               
   String lastCompletedTime, Set<String> savepointsToTrack, Option<String> 
earliestCommitToNotArchive) {
     try {
       Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
       partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
           Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
       Map<String, String> extraMetadata = new HashMap<>();
-      if (!savepointsToTrack.isEmpty()) {
-        extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
-      }
+      extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
       HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
           CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, 
extraMetadata.isEmpty() ? null : extraMetadata);
       return Pair.of(cleanMetadata, 
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
@@ -545,18 +606,23 @@ public class TestCleanPlanner {
     }
   }
 
-  private static void mockLastCleanCommit(HoodieTable hoodieTable, String 
timestamp, String earliestCommitToRetain, HoodieActiveTimeline activeTimeline,
-                                          Pair<HoodieCleanMetadata, 
Option<byte[]>> cleanMetadata)
+  private static HoodieCleanerPlan mockLastCleanCommit(HoodieTable 
hoodieTable, String timestamp, String earliestCommitToRetain, 
HoodieActiveTimeline activeTimeline,
+                                                       
Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadata, Set<String> 
savepointsTrackedInLastClean)
       throws IOException {
     HoodieDefaultTimeline cleanTimeline = mock(HoodieDefaultTimeline.class);
     when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
     when(hoodieTable.getCleanTimeline()).thenReturn(cleanTimeline);
     HoodieDefaultTimeline completedCleanTimeline = 
mock(HoodieDefaultTimeline.class);
     
when(cleanTimeline.filterCompletedInstants()).thenReturn(completedCleanTimeline);
-    HoodieInstant latestCleanInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, 
timestamp);
+    HoodieInstant latestCleanInstant = new HoodieInstant(COMPLETED, 
HoodieTimeline.CLEAN_ACTION, timestamp);
     
when(completedCleanTimeline.lastInstant()).thenReturn(Option.of(latestCleanInstant));
     when(activeTimeline.isEmpty(latestCleanInstant)).thenReturn(false);
     
when(activeTimeline.getInstantDetails(latestCleanInstant)).thenReturn(cleanMetadata.getRight());
+    HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant(earliestCommitToRetain, HoodieTimeline.COMMIT_ACTION, 
COMPLETED.name()),
+        cleanMetadata.getLeft().getLastCompletedCommitTimestamp(),
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(), 
Collections.emptyMap(),
+        CleanPlanner.LATEST_CLEAN_PLAN_VERSION, null, null, 
cleanMetadata.getLeft().getExtraMetadata());
+    
when(activeTimeline.readCleanerInfoAsBytes(any())).thenReturn(Option.of(TimelineMetadataUtils.serializeAvroMetadata(cleanerPlan,
 HoodieCleanerPlan.class).get()));
 
     HoodieDefaultTimeline commitsTimeline = mock(HoodieDefaultTimeline.class);
     when(activeTimeline.getCommitsTimeline()).thenReturn(commitsTimeline);
@@ -564,10 +630,11 @@ public class TestCleanPlanner {
 
     when(hoodieTable.isPartitioned()).thenReturn(true);
     when(hoodieTable.isMetadataTable()).thenReturn(false);
+    return cleanerPlan;
   }
 
   private static void mockFewActiveInstants(HoodieTable hoodieTable, 
Map<String, List<String>> activeInstantsToPartitions,
-                                            Map<String, List<String>> 
savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved)
+                                            Map<String, List<String>> 
savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved, List<String> 
replaceCommits)
       throws IOException {
     HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
     List<HoodieInstant> instants = new ArrayList<>();
@@ -577,7 +644,7 @@ public class TestCleanPlanner {
       instantstoProcess.putAll(savepointedCommitsToAdd);
     }
     instantstoProcess.forEach((k, v) -> {
-      HoodieInstant hoodieInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k);
+      HoodieInstant hoodieInstant = new HoodieInstant(COMPLETED, 
HoodieTimeline.COMMIT_ACTION, k);
       instants.add(hoodieInstant);
       Map<String, List<HoodieWriteStat>> partitionToWriteStats = new 
HashMap<>();
       v.forEach(partition -> partitionToWriteStats.put(partition, 
Collections.emptyList()));
@@ -593,7 +660,23 @@ public class TestCleanPlanner {
     });
 
     commitsTimeline.setInstants(instants);
+    Collections.sort(instants);
+    
when(hoodieTable.getActiveTimeline().getInstantsAsStream()).thenReturn(instants.stream());
     
when(hoodieTable.getCompletedCommitsTimeline()).thenReturn(commitsTimeline);
+
+    HoodieDefaultTimeline savepointTimeline = new HoodieDefaultTimeline();
+    List<HoodieInstant> savepointInstants = 
savepointedCommitsToAdd.keySet().stream().map(sp -> new 
HoodieInstant(COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, sp))
+        .collect(Collectors.toList());
+    savepointTimeline.setInstants(savepointInstants);
+
+    HoodieDefaultTimeline completedReplaceTimeline = new 
HoodieDefaultTimeline();
+    List<HoodieInstant> completedReplaceInstants = 
replaceCommits.stream().map(rc -> new HoodieInstant(COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, rc))
+        .collect(Collectors.toList());
+    completedReplaceTimeline.setInstants(completedReplaceInstants);
+
+    
when(hoodieTable.getActiveTimeline().findInstantsAfterOrEquals(any())).thenCallRealMethod();
+    
when(hoodieTable.getActiveTimeline().getCompletedReplaceTimeline()).thenReturn(completedReplaceTimeline);
+    
when(hoodieTable.getActiveTimeline().getSavePointTimeline()).thenReturn(savepointTimeline);
     when(hoodieTable.isPartitioned()).thenReturn(true);
     when(hoodieTable.isMetadataTable()).thenReturn(false);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index cb4a871cab1..435b5543f1c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -48,6 +48,7 @@ import 
org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
@@ -287,7 +288,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         verifyArchival(
             getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
             getActiveCommitInstants(Arrays.asList("00000003", "00000004", 
"00000005", "00000006")),
-            commitsAfterArchival);
+            commitsAfterArchival, false);
       } else if (i < 8) {
         assertEquals(originalCommits, commitsAfterArchival);
       } else if (i == 8) {
@@ -295,7 +296,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         verifyArchival(
             getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", 
"00000003", "00000004")),
             getActiveCommitInstants(Arrays.asList("00000005", "00000006", 
"00000007", "00000008")),
-            commitsAfterArchival);
+            commitsAfterArchival, false);
       } else {
         assertEquals(originalCommits, commitsAfterArchival);
       }
@@ -481,13 +482,13 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       // retains only 2 commits. C3 and C8. and savepointed commit for C3.
       verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002", "00000004", "00000005")),
           Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", 
"00000006")).stream(), 
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
-              .collect(Collectors.toList()), commitsAfterArchival);
+              .collect(Collectors.toList()), commitsAfterArchival, true);
     } else {
       // archives only C1 and C2. stops at first savepointed commit C3.
       verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
           Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000005", "00000006")).stream(),
                   
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
-              .collect(Collectors.toList()), commitsAfterArchival);
+              .collect(Collectors.toList()), commitsAfterArchival, false);
     }
 
     for (int i = 7; i < 10; i++) {
@@ -502,7 +503,112 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
 
     metaClient.reloadActiveTimeline();
     verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002", "00000003", "00000004", "00000005", "00000006", "00000007")),
-        getActiveCommitInstants(Arrays.asList("00000008", "00000009")), 
commitsAfterArchival);
+        getActiveCommitInstants(Arrays.asList("00000008", "00000009")), 
commitsAfterArchival, false);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testClusteringCommitAndSavepointWithArchival(boolean 
archiveBeyondSavepoint) throws Exception {
+    boolean enableMetadata = false;
+    HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, 
HoodieTableType.COPY_ON_WRITE,
+        10, HoodieFailedWritesCleaningPolicy.EAGER, 
WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+
+    // min archival commits is 2 and max archival commits is 4. and so, after 
5th commit, 3 commits will be archived.
+    for (int i = 1; i < 8; i++) {
+      if (i < 3 || i == 5) {
+        testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : 
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+      } else {
+        testTable.doCluster(String.format("%08d", i), 
Collections.singletonMap("p1", Collections.singletonList("f1")), 
Collections.singletonList("p2"), 2);
+      }
+    }
+
+    // savepoint 3rd commit
+    String commitToSavepoint = String.format("%08d", 3);
+    HoodieSavepointMetadata savepointMetadata = 
testTable.doSavepoint(commitToSavepoint);
+    testTable.addSavepoint(commitToSavepoint, savepointMetadata);
+
+    // trigger archival
+    Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+    List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+    // Only commits 1, 2 should be archived since savepoint exists at c3. In 
the case when
+    // archiveBeyondSavepoint is false, archival is blocked since replace 
commits have not yet been cleaned
+    List<HoodieInstant> expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000005"));
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+    
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+    verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
+        expectedActiveInstants, commitsAfterArchival, false);
+
+    // add a clean commit with earliest instant to retain as c8 and earliest 
savepoint as c3.
+    // After this commit clean along with snapshot blocks archival even though 
earliest instant to retain is c8
+    Map<String, Integer> cleanStats = new HashMap<>();
+    cleanStats.put("p1", 1);
+    cleanStats.put("p2", 2);
+    testTable.doClean(String.format("%08d", 8), cleanStats,
+        Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS, 
"00000003"));
+
+    // trigger archival
+    commitsList = archiveAndGetCommitsList(writeConfig);
+    commitsAfterArchival = commitsList.getValue();
+    // archives only C1 and C2. stops at first replace commit C3 after 
savepoint based on cleaner
+    expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000005"));
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+    
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"),
 HoodieTimeline.CLEAN_ACTION));
+    verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
+        expectedActiveInstants, commitsAfterArchival, false);
+
+    // add a clean commit with earliest savepoint set as c7
+    testTable.doClean(String.format("%08d", 9), cleanStats, 
Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS, "00000007"));
+
+    // trigger archival
+    commitsList = archiveAndGetCommitsList(writeConfig);
+    commitsAfterArchival = commitsList.getValue();
+
+    if (archiveBeyondSavepoint) {
+      // retains the 2 commits - C3 and C7. Since minInstantsToKeep is 2, c3 
is retained. Archival is now blocked at
+      // c7 since that is the replace commit after earliest savepoint c7 in 
cleaner
+      expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000003", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+      List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+      
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
 "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+      verifyArchival(archivedCommitInstants, expectedActiveInstants, 
commitsAfterArchival, true);
+    } else {
+      // archives only C1 and C2. stops at c3 since clean earliest savepoint 
is c3.
+      expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000005"));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+      
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
+          expectedActiveInstants, commitsAfterArchival, false);
+    }
+
+    // savepoint is removed
+    testTable.deleteSavepoint(commitToSavepoint);
+
+    // trigger archival
+    commitsList = archiveAndGetCommitsList(writeConfig);
+    commitsAfterArchival = commitsList.getValue();
+
+    if (archiveBeyondSavepoint) {
+      // change from last state - Removal of savepoint instant from the active 
timeline since it is deleted
+      expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000003", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+      
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
 "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+      verifyArchival(archivedCommitInstants, expectedActiveInstants, 
commitsAfterArchival, true);
+    } else {
+      // change from last state - Removal of savepoint instant from the active 
timeline since it is deleted
+      // since savepoint is now deleted, it does not block archival.
+      // archival is triggered since clean also does not block it
+      // c6 and c7 are retained since min instants to keep is 2
+      expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000006", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+      
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000003",
 "00000004"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+      verifyArchival(archivedCommitInstants, expectedActiveInstants, 
commitsAfterArchival, false);
+    }
   }
 
   @Test
@@ -869,7 +975,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
             .map(p -> new HoodieInstant(State.COMPLETED, p.getValue(), 
p.getKey())).collect(Collectors.toList());
         List<HoodieInstant> expectedActiveInstants = 
instants.subList(numArchivedInstants, instants.size()).stream()
             .map(p -> new HoodieInstant(State.COMPLETED, p.getValue(), 
p.getKey())).collect(Collectors.toList());
-        verifyArchival(expectedArchivedInstants, expectedActiveInstants, 
commitsAfterArchival);
+        verifyArchival(expectedArchivedInstants, expectedActiveInstants, 
commitsAfterArchival, false);
       }
     }
     assertTrue(hasArchivedInstants, "Some instants should be archived");
@@ -939,7 +1045,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     archivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "00000002"));
     verifyArchival(archivedInstants,
         getActiveCommitInstants(Arrays.asList("00000005", "00000006", 
"00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION),
-        commitsAfterArchival);
+        commitsAfterArchival, false);
   }
 
   @ParameterizedTest
@@ -1032,7 +1138,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 8);
     // Min archival commits is 2 and max archival commits is 4.
     // When metadata table is not enabled, after 5th write instant, archive 
will be triggered.
-    // When metadata table is enabled, after 8th instant (6 write instants + 2 
clean instants) >= maxDeltaCommitsMetadataTable,
+    // When metadata table is enabled, after 8th instant (5 write instants + 3 
clean instants) >= maxDeltaCommitsMetadataTable,
     // archival kicks in when compaction in metadata table triggered.
     Map<String, Integer> cleanStats = new HashMap<>();
     cleanStats.put("p1", 1);
@@ -1040,7 +1146,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     for (int i = 1; i <= 8; i++) {
       if (i == 1) {
         testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", 
"p2"), 20);
-      } else if (i <= 3) {
+      } else if (i <= 3 || i == 5) {
         testTable.doClean(String.format("%08d", i), cleanStats);
       } else {
         testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 
2);
@@ -1049,39 +1155,83 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
       List<HoodieInstant> originalCommits = commitsList.getKey();
       List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
-      if (i < 7) {
+      if (i <= 7) {
         assertEquals(originalCommits, commitsAfterArchival);
-      } else if (i == 7) {
-        if (!enableMetadata) {
-          // do archive:
-          // clean: 2,3: after archival -> null
-          // write: 1,4,5,6,7: after archival -> 6, 7
-          List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000006", "00000007")));
-
-          List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
-          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000005")));
-          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003"), HoodieTimeline.CLEAN_ACTION));
-
-          verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
-        } else {
-          // with metadata enabled, archival in data table is fenced based on 
compaction in metadata table.
-          assertEquals(originalCommits, commitsAfterArchival);
-        }
       } else {
-        if (!enableMetadata) {
-          assertEquals(originalCommits, commitsAfterArchival);
-        } else {
-          // when i == 8 compaction in metadata table will be triggered, and 
then allow archive:
-          // clean: 2,3: after archival -> null
-          // write: 1,4,5,6,7,8: after archival -> 7, 8
-          List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
-
-          List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
-          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000005", "00000006")));
-          
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003"), HoodieTimeline.CLEAN_ACTION));
+        // when i == 8 compaction in metadata table will be triggered, and 
then allow archive:
+        // clean: 2,3,5: all will be archived
+        // write: 1,4,6,7,8: after archival -> 7,8
+        List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
+        List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+        
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000006")));
+        
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003", "00000005"), HoodieTimeline.CLEAN_ACTION));
+
+        verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival, false);
+      }
+    }
+  }
 
-          verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
+  @Test
+  public void testArchiveTableWithCleanerEarliestSavepoint() throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 4, 
5, 2);
+
+    // min archival commits is 4 and max archival commits is 5
+    // (commits have to be greater than 5)
+    // and so, after 6th instant, 2 instants will be archived.
+    // Instant 1 -> 10 are commits except 6 which is a clean instant.
+    // Clean instants have SAVEPOINTED_TIMESTAMPS set as 3 till clean instant 
14.
+    // At 16th clean instant, SAVEPOINTED_TIMESTAMPS is set as 10.
+    Map<String, Integer> cleanStats = new HashMap<>();
+    cleanStats.put("p1", 1);
+    cleanStats.put("p2", 2);
+    for (int i = 1; i <= 16; i++) {
+      if (i == 7) {
+        testTable.doCluster(String.format("%08d", i), Collections.emptyMap(), 
Arrays.asList("p1", "p2"), 20);
+      } else if (i != 6 && i <= 12) {
+        testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : 
Collections.emptyList(), Arrays.asList("p1", "p2"), 20);
+      } else {
+        String savepoint = "00000003";
+        if (i == 16) {
+          // only 16th clean commit has set savepoint as 10. Before 16th clean 
commit only commits
+          // 1 and 2 should be archived.
+          savepoint = "00000010";
         }
+        testTable.doClean(String.format("%08d", i), cleanStats, 
Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS, savepoint));
+      }
+      // trigger archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+      if (i <= 6) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else if (i <= 10) {
+        // clean instants have not been archived yet, SAVEPOINTED_TIMESTAMPS 
is set as 3 but since there are no replace commits
+        // between earliest savepoint 3 and earliest instant to retain which 
is 6, archival is not blocked at 3 but at commit 6
+        // Archival gets triggered at commit 7 and 9, these would remove 
commits (1,2) and (3,4) respectively
+        // Other commits do not trigger archival since maxInstantsToKeep is 5 
and after archival instants are 4
+        assertEquals(5 + (i % 2 == 0 ? 1 : 0), commitsAfterArchival.size(), 
commitsAfterArchival.toString());
+      } else if (i < 16) {
+        // Archival gets triggered at 11 and it archives commits before 6 
since archival is blocked at commit 6
+        // due to cleaner earliest instant to retain set at 6
+        // clean instants have not been archived yet, SAVEPOINTED_TIMESTAMPS 
is set as 3 but since there are no replace commits
+        // between earliest savepoint 3 and earliest instant to retain which 
is 6, archival is not blocked at 3 but at commit 6
+        assertEquals(i - 5, commitsAfterArchival.size(), 
commitsAfterArchival.toString());
+      } else {
+        // At 16th clean instant, SAVEPOINTED_TIMESTAMPS is set as 10
+        // There are no replace commits between earliest savepoint (10) and 
cleaner earliest instant to retain (16)
+        // Therefore archival will not be blocked by savepoint
+        // active commits were 3,4,5,7,8,9,10,11,12 => After archival only 4 
instants would remain 9,10,11,12 based on archival config
+        // clean instants were 6,13,14,15,16 => clean instant 6 would be 
archived since commits instants are archived till commit 8
+        List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
+        
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009", 
"00000010", "00000011", "00000012")));
+        expectedActiveInstants.addAll(
+            getActiveCommitInstants(Arrays.asList("00000013", "00000014", 
"00000015", "00000016"), HoodieTimeline.CLEAN_ACTION));
+        List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
+        expectedArchivedInstants.addAll(getAllArchivedCommitInstants(
+            Arrays.asList("00000001", "00000002", "00000003", "00000004", 
"00000005", "00000008"), HoodieTimeline.COMMIT_ACTION));
+        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"),
 HoodieTimeline.CLEAN_ACTION));
+        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000007"),
 HoodieTimeline.REPLACE_COMMIT_ACTION));
+        verifyArchival(expectedArchivedInstants, expectedActiveInstants, 
commitsAfterArchival, false);
       }
     }
   }
@@ -1137,7 +1287,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     List<HoodieInstant> expectedActiveInstants = instants.subList(archived, 
instants.size()).stream()
         .map(p -> new HoodieInstant(State.COMPLETED, p.getValue(), 
p.getKey())).collect(Collectors.toList());
 
-    verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival);
+    verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival, false);
   }
 
   @ParameterizedTest
@@ -1168,6 +1318,9 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, 
HoodieTimeline.ROLLBACK_ACTION, String.format("%02d", startInstant)));
     }
 
+    // Clean and rollback instants are archived only till the last clean 
instant in the timeline
+    createCleanMetadata(String.format("%02d", startInstant), false, false, 
isEmpty);
+
     if (enableMetadataTable) {
       // Simulate a compaction commit in metadata table timeline
       // so the archival in data table can happen
@@ -1231,7 +1384,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     // after archival 4,5,6,7
     assertEquals(originalCommits.size() - commitsAfterArchival.size(), 3);
     verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 3)),
-        getActiveCommitInstants(instants.subList(3, 7)), commitsAfterArchival);
+        getActiveCommitInstants(instants.subList(3, 7)), commitsAfterArchival, 
false);
 
     // 3 more commits, 4 to 6 will be archived. but will not move after 6 
since compaction has to kick in metadata table.
     for (int i = 0; i < 3; i++) {
@@ -1244,7 +1397,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     originalCommits = commitsList.getKey();
     commitsAfterArchival = commitsList.getValue();
     assertEquals(originalCommits.size() - commitsAfterArchival.size(), 3);
-    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)), 
getActiveCommitInstants(instants.subList(6, 10)), commitsAfterArchival);
+    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)), 
getActiveCommitInstants(instants.subList(6, 10)), commitsAfterArchival, false);
 
     // No archival should kick in since compaction has not kicked in metadata 
table
     for (int i = 0; i < 2; i++) {
@@ -1256,7 +1409,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     originalCommits = commitsList.getKey();
     commitsAfterArchival = commitsList.getValue();
     assertEquals(originalCommits, commitsAfterArchival);
-    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)), 
getActiveCommitInstants(instants.subList(6, 12)), commitsAfterArchival);
+    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)), 
getActiveCommitInstants(instants.subList(6, 12)), commitsAfterArchival, false);
 
     String instant13 = metaClient.createNewInstantTime();
     instants.add(instant13);
@@ -1278,7 +1431,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     // before archival 7,8,9,10,11,12,13,14
     // after archival 11,12,13,14
     assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4);
-    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 10)), 
getActiveCommitInstants(instants.subList(10, 14)), commitsAfterArchival);
+    verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 10)), 
getActiveCommitInstants(instants.subList(10, 14)), commitsAfterArchival, false);
   }
 
   @ParameterizedTest
@@ -1675,7 +1828,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     return Pair.of(originalCommits, commitsAfterArchival);
   }
 
-  private void verifyArchival(List<HoodieInstant> expectedArchivedInstants, 
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant> 
commitsAfterArchival) {
+  private void verifyArchival(List<HoodieInstant> expectedArchivedInstants, 
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant> 
commitsAfterArchival, boolean isArchivalBeyondSavepoint) {
     
expectedActiveInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
     
commitsAfterArchival.sort(Comparator.comparing(HoodieInstant::getTimestamp));
     assertEquals(expectedActiveInstants, commitsAfterArchival);
@@ -1689,7 +1842,13 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     HoodieTimeline timeline = metaClient.getActiveTimeline();
     expectedArchivedInstants.forEach(entry -> {
           // check safety
-          if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
+          // when isArchivalBeyondSavepoint is set to true, there is a case 
possible where archival instant
+          // is after the first completed instant in the timeline. Lets say 
savepoint was taken at commit c1
+          // and commit c1 is a deltacommit which is not archived but c2 got 
archived. If savepoint is deleted
+          // then containsOrBeforeTimelineStarts check would fail for c2 
archived commit as the function compares
+          // against first non savepoint commit in the timeline which is now 
deleted. With c1 it would succeed but
+          // once c1 is removed, it would start failing.
+          if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && 
!isArchivalBeyondSavepoint) {
             
assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()), 
"Archived commits should always be safe");
           }
         }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index bc6dd6094c5..8261aaf6538 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -290,6 +290,12 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
         .limit(numCommits), details);
   }
 
+  @Override
+  public HoodieTimeline findInstantsAfterOrEquals(String commitTime) {
+    return new HoodieDefaultTimeline(getInstantsAsStream()
+        .filter(s -> compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, commitTime)), details);
+  }
+
   @Override
   public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
     return new HoodieDefaultTimeline(getInstantsAsStream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 4ec700fe577..80ba8565507 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.StringUtils;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
@@ -250,10 +251,15 @@ public interface HoodieTimeline extends Serializable {
   HoodieTimeline filterRequestedRollbackTimeline();
 
   /**
-   * Create a new Timeline with all the instants after startTs.
+   * Create a new Timeline with numCommits after startTs.
    */
   HoodieTimeline findInstantsAfterOrEquals(String commitTime, int numCommits);
 
+  /**
+   * Create a new Timeline with all the instants after startTs.
+   */
+  HoodieTimeline findInstantsAfterOrEquals(String commitTime);
+
   /**
    * Create a new Timeline with instants after startTs and before or on endTs.
    */
@@ -455,6 +461,29 @@ public interface HoodieTimeline extends Serializable {
     return predicateToApply.test(commit1, commit2);
   }
 
+  /**
+   * Returns smaller of the two given timestamps. Returns the non null 
argument if one of the argument is null.
+   */
+  static String minTimestamp(String commit1, String commit2) {
+    if (StringUtils.isNullOrEmpty(commit1)) {
+      return commit2;
+    } else if (StringUtils.isNullOrEmpty(commit2)) {
+      return commit1;
+    }
+    return minInstant(commit1, commit2);
+  }
+
+  /**
+   * Returns smaller of the two given instants compared by their respective 
timestamps.
+   * Returns the non null argument if one of the argument is null.
+   */
+  static HoodieInstant minTimestampInstant(HoodieInstant instant1, 
HoodieInstant instant2) {
+    String commit1 = instant1 != null ? instant1.getTimestamp() : null;
+    String commit2 = instant2 != null ? instant2.getTimestamp() : null;
+    String minTimestamp = minTimestamp(commit1, commit2);
+    return Objects.equals(minTimestamp, commit1) ? instant1 : instant2;
+  }
+
   /**
    * Returns the smaller of the given two instants.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 0fa758c21e1..f1488f31318 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -57,7 +57,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
 public class CleanerUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CleanerUtils.class);
-
+  public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
   public static final Integer CLEAN_METADATA_VERSION_1 = 
CleanMetadataV1MigrationHandler.VERSION;
   public static final Integer CLEAN_METADATA_VERSION_2 = 
CleanMetadataV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_METADATA_VERSION = 
CLEAN_METADATA_VERSION_2;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 4b4dddccd9b..691695dff45 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieClusteringStrategy;
@@ -27,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -333,12 +335,13 @@ public class ClusteringUtils {
    * Returns the earliest instant to retain.
    * Make sure the clustering instant won't be archived before cleaned, and 
the earliest inflight clustering instant has a previous commit.
    *
-   * @param activeTimeline The active timeline
-   * @param metaClient     The meta client
+   * @param activeTimeline               The active timeline
+   * @param metaClient                   The meta client
+   * @param cleanerPolicy                The hoodie cleaning policy
    * @return the earliest instant to retain for clustering
    */
   public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(
-      HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) 
throws IOException {
+      HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient, 
HoodieCleaningPolicy cleanerPolicy) throws IOException {
     Option<HoodieInstant> oldestInstantToRetain = Option.empty();
     HoodieTimeline replaceOrClusterTimeline = 
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION,
 HoodieTimeline.CLUSTERING_ACTION));
     if (!replaceOrClusterTimeline.empty()) {
@@ -348,13 +351,14 @@ public class ClusteringUtils {
         // The first clustering instant of which timestamp is greater than or 
equal to the earliest commit to retain of
         // the clean metadata.
         HoodieInstant cleanInstant = cleanInstantOpt.get();
-        HoodieActionInstant earliestInstantToRetain = 
CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested()
-                ? cleanInstant
-                : 
HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
-            .getEarliestInstantToRetain();
+        HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested() ? 
cleanInstant : 
HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()));
+        Option<String> earliestInstantToRetain = 
Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp);
         String retainLowerBound;
-        if (earliestInstantToRetain != null && 
!StringUtils.isNullOrEmpty(earliestInstantToRetain.getTimestamp())) {
-          retainLowerBound = earliestInstantToRetain.getTimestamp();
+        Option<String> earliestReplacedSavepointInClean = 
getEarliestReplacedSavepointInClean(activeTimeline, cleanerPolicy, cleanerPlan);
+        if (earliestReplacedSavepointInClean.isPresent()) {
+          retainLowerBound = earliestReplacedSavepointInClean.get();
+        } else if (earliestInstantToRetain.isPresent() && 
!StringUtils.isNullOrEmpty(earliestInstantToRetain.get())) {
+          retainLowerBound = earliestInstantToRetain.get();
         } else {
           // no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS 
clean policy,
           // retain first instant after clean instant.
@@ -365,13 +369,7 @@ public class ClusteringUtils {
           // TODO: This case has to be handled. HUDI-6352
           retainLowerBound = cleanInstant.getTimestamp();
         }
-
-        oldestInstantToRetain = replaceOrClusterTimeline.filter(instant ->
-                HoodieTimeline.compareTimestamps(
-                    instant.getTimestamp(),
-                    HoodieTimeline.GREATER_THAN_OR_EQUALS,
-                    retainLowerBound))
-            .firstInstant();
+        oldestInstantToRetain = 
replaceOrClusterTimeline.findInstantsAfterOrEquals(retainLowerBound).firstInstant();
       } else {
         oldestInstantToRetain = replaceOrClusterTimeline.firstInstant();
       }
@@ -379,6 +377,43 @@ public class ClusteringUtils {
     return oldestInstantToRetain;
   }
 
+  public static Option<String> 
getEarliestReplacedSavepointInClean(HoodieActiveTimeline activeTimeline, 
HoodieCleaningPolicy cleanerPolicy,
+                                                                   
HoodieCleanerPlan cleanerPlan) {
+    // EarliestSavepoint in clean is required to block archival when savepoint 
is deleted.
+    // This ensures that archival is blocked until clean has cleaned up files 
retained due to savepoint.
+    // If this guard is not present, the archiving of commits can lead to 
duplicates. Here is a scenario
+    // illustrating the same. This scenario considers a case where 
EarliestSavepoint guard is not present
+    // c1.dc - f1 (c1 deltacommit creates file with id f1)
+    // c2.dc - f2 (c2 deltacommit creates file with id f2)
+    // c2.sp - Savepoint at c2
+    // c3.rc (replacing f2 -> f3) (Replace commit replacing file id f2 with f3)
+    // c4.dc
+    //
+    // Lets say Incremental cleaner moved past the c3.rc without cleaning f2 
since savepoint is created at c2.
+    // Archival is blocked at c2 since there is a savepoint at c2.
+    // Let's say the savepoint at c2 is now deleted, Archival would archive 
c3.rc since it is unblocked now.
+    // Since c3 is archived and f2 has not been cleaned, the table view would 
be considering f2 as a valid
+    // file id. This causes duplicates.
+
+    String earliestInstantToRetain = 
Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp).orElse(null);
+    Option<String[]> savepoints = 
Option.ofNullable(cleanerPlan.getExtraMetadata()).map(metadata -> 
metadata.getOrDefault(CleanerUtils.SAVEPOINTED_TIMESTAMPS, 
StringUtils.EMPTY_STRING).split(","));
+    String earliestSavepoint = savepoints.flatMap(arr -> 
Option.fromJavaOptional(Arrays.stream(arr).sorted().findFirst())).orElse(null);
+    if (cleanerPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      if (!StringUtils.isNullOrEmpty(earliestInstantToRetain) && 
!StringUtils.isNullOrEmpty(earliestSavepoint)) {
+        // When earliestToRetainTs is greater than first savepoint timestamp 
and there are no
+        // replace commits between the first savepoint and the 
earliestToRetainTs, we can set the
+        // earliestSavepointOpt to empty as there was no cleaning blocked due 
to savepoint
+        if (HoodieTimeline.compareTimestamps(earliestInstantToRetain, 
HoodieTimeline.GREATER_THAN, earliestSavepoint)) {
+          HoodieTimeline replaceTimeline = 
activeTimeline.getCompletedReplaceTimeline().findInstantsInClosedRange(earliestSavepoint,
 earliestInstantToRetain);
+          if (!replaceTimeline.empty()) {
+            return Option.of(earliestSavepoint);
+          }
+        }
+      }
+    }
+    return Option.empty();
+  }
+
   /**
    * @param instant  Hudi instant to check.
    * @param timeline Hudi timeline.
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index da0f8c021ee..def0268e5c0 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -624,6 +624,17 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void testMinTimestamp() {
+    String timestamp1 = "20240601040632402";
+    String timestamp2 = "20250601040632402";
+    assertEquals(timestamp1, HoodieTimeline.minTimestamp(null, timestamp1));
+    assertEquals(timestamp1, HoodieTimeline.minTimestamp("", timestamp1));
+    assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1, null));
+    assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1, ""));
+    assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1, 
timestamp2));
+  }
+
   @Test
   public void testParseDateFromInstantTime() throws ParseException {
     // default second granularity instant ID
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 14c00111d21..4a0ef616b3e 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -1056,6 +1056,10 @@ public class HoodieTestTable implements AutoCloseable {
   }
 
   public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> 
partitionFileCountsToDelete) throws IOException {
+    return doClean(commitTime, partitionFileCountsToDelete, 
Collections.emptyMap());
+  }
+
+  public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> 
partitionFileCountsToDelete, Map<String, String> extraMetadata) throws 
IOException {
     Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
     for (Map.Entry<String, Integer> entry : 
partitionFileCountsToDelete.entrySet()) {
       partitionFilesToDelete.put(entry.getKey(), 
getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
@@ -1066,8 +1070,11 @@ public class HoodieTestTable implements AutoCloseable {
       deleteFilesInPartition(entry.getKey(), entry.getValue());
     }
     Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta = 
getHoodieCleanMetadata(commitTime, testTableState);
-    addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
-    return cleanerMeta.getValue();
+    HoodieCleanMetadata cleanMetadata = cleanerMeta.getValue();
+    cleanerMeta.getKey().setExtraMetadata(extraMetadata);
+    cleanMetadata.setExtraMetadata(extraMetadata);
+    addClean(commitTime, cleanerMeta.getKey(), cleanMetadata);
+    return cleanMetadata;
   }
 
   /**
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 7ac82c8e7ae..492240138b5 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -165,7 +165,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     HoodieInstant inflightInstant3 = 
metaClient.getActiveTimeline().transitionClusterRequestedToInflight(requestedInstant3,
 Option.empty());
     HoodieInstant completedInstant3 = 
metaClient.getActiveTimeline().transitionClusterInflightToComplete(true, 
inflightInstant3, Option.empty());
     metaClient.reloadActiveTimeline();
-    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient, null);
     assertTrue(actual.isPresent());
     assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in 
timeline, retain first replace commit");
 
@@ -187,7 +187,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightInstant4,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
     metaClient.reloadActiveTimeline();
-    actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient, null);
     assertEquals(clusterTime3, actual.get().getTimestamp(),
         "retain the first replace commit after the earliestInstantToRetain ");
   }
@@ -225,7 +225,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     metaClient.getActiveTimeline().transitionClusterInflightToComplete(true, 
inflightInstant3, Option.empty());
     metaClient.reloadActiveTimeline();
 
-    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient);
+    Option<HoodieInstant> actual = 
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
 metaClient, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
     assertEquals(clusterTime2, actual.get().getTimestamp(),
         "retain the first replace commit after the last complete clean ");
   }

Reply via email to