This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 687bf6b04511331d51dad398657554986d9ab13c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 07:44:17 2024 -0700

    [HUDI-7532] Include only compaction instants for lastCompaction in 
getDeltaCommitsSinceLatestCompaction (#10915)
    
    * Fixing schedule compaction bug
    
    * Addressing comments
    
    * Fixing CDC tests
---
 .../hudi/cli/commands/CompactionCommand.java       |  2 +-
 .../hudi/cli/commands/FileSystemViewCommand.java   |  2 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  2 +-
 .../apache/hudi/cli/commands/RepairsCommand.java   |  4 +-
 .../org/apache/hudi/cli/commands/StatsCommand.java |  2 +-
 .../java/org/apache/hudi/cli/utils/CommitUtil.java |  2 +-
 .../apache/hudi/cli/commands/TestTableCommand.java |  6 +--
 .../hudi/cli/integ/ITTestSavepointsCommand.java    |  6 +--
 .../index/bucket/ConsistentBucketIndexUtils.java   |  2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../table/action/commit/JavaUpsertPartitioner.java |  2 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  | 14 +++---
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  |  2 +-
 .../testutils/HoodieJavaClientTestHarness.java     |  2 +-
 .../java/org/apache/hudi/client/TestMultiFS.java   |  4 +-
 .../hudi/client/TestTableSchemaEvolution.java      |  4 +-
 .../functional/TestHoodieBackedMetadata.java       | 14 +++---
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  8 +--
 .../java/org/apache/hudi/table/TestCleaner.java    |  2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  6 +--
 .../table/action/compact/TestInlineCompaction.java |  6 +--
 .../TestCopyOnWriteRollbackActionExecutor.java     |  2 +-
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |  4 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  6 +--
 .../hudi/testutils/HoodieClientTestBase.java       |  2 +-
 .../SparkClientFunctionalTestHarness.java          |  4 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |  6 +--
 .../table/timeline/HoodieDefaultTimeline.java      | 11 +++-
 .../apache/hudi/common/util/CompactionUtils.java   |  3 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  2 +-
 .../common/table/TestHoodieTableMetaClient.java    |  8 +--
 .../hudi/common/table/TestTimelineUtils.java       | 12 ++---
 .../table/timeline/TestHoodieActiveTimeline.java   | 44 ++++++++++------
 .../hudi/common/util/TestCompactionUtils.java      | 58 ++++++++++++++++++++++
 .../RepairAddpartitionmetaProcedure.scala          |  2 +-
 .../RepairMigratePartitionMetaProcedure.scala      |  2 +-
 .../ShowHoodieLogFileRecordsProcedure.scala        |  2 +-
 .../StatsWriteAmplificationProcedure.scala         |  2 +-
 .../procedures/ValidateHoodieSyncProcedure.scala   |  2 +-
 .../src/test/java/HoodieJavaStreamingApp.java      |  4 +-
 .../hudi/functional/TestMORDataSourceStorage.scala |  2 +-
 .../hudi/functional/TestStructuredStreaming.scala  |  2 +-
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 26 +++++-----
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |  8 +--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  4 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  2 +-
 47 files changed, 197 insertions(+), 119 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 1679a327007..6a297e868e0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -316,7 +316,7 @@ public class CompactionCommand {
         .filter(pair -> pair.getRight() != null)
         .collect(Collectors.toList());
 
-    Set<String> committedInstants = 
timeline.getCommitTimeline().filterCompletedInstants()
+    Set<String> committedInstants = 
timeline.getCommitAndReplaceTimeline().filterCompletedInstants()
         
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
 
     List<Comparable[]> rows = new ArrayList<>();
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index cbb2ae2177c..e9a3a3c922a 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -247,7 +247,7 @@ public class FileSystemViewCommand {
 
     HoodieTimeline timeline;
     if (basefileOnly) {
-      timeline = metaClient.getActiveTimeline().getCommitTimeline();
+      timeline = metaClient.getActiveTimeline().getCommitAndReplaceTimeline();
     } else if (excludeCompaction) {
       timeline = metaClient.getActiveTimeline().getCommitsTimeline();
     } else {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 307ca81cea0..b4c72021ee6 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -232,7 +232,7 @@ public class HoodieLogFileCommand {
               .withReaderSchema(readerSchema)
               .withLatestInstantTime(
                   client.getActiveTimeline()
-                      .getCommitTimeline().lastInstant().get().getTimestamp())
+                      
.getCommitAndReplaceTimeline().lastInstant().get().getTimestamp())
               .withReverseReader(
                   Boolean.parseBoolean(
                       
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 8783e749057..2418976c4e4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -118,7 +118,7 @@ public class RepairsCommand {
 
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     String latestCommit =
-        
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
+        
client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp();
     List<String> partitionPaths =
         FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, 
client.getBasePath());
     StoragePath basePath = client.getBasePathV2();
@@ -239,7 +239,7 @@ public class RepairsCommand {
       Option<StoragePath> baseFormatFile =
           
HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.storage, 
partition);
       String latestCommit =
-          
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
+          
client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp();
 
       String[] row = new String[] {
           partitionPath,
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index f8e60ba8cee..9f859bf72bf 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -69,7 +69,7 @@ public class StatsCommand {
     long totalRecordsWritten = 0;
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = 
activeTimeline.getCommitAndReplaceTimeline().filterCompletedInstants();
 
     List<Comparable[]> rows = new ArrayList<>();
     DecimalFormat df = new DecimalFormat("#.00");
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index 21910fd956d..12322617fb2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -36,7 +36,7 @@ public class CommitUtil {
 
   public static long countNewRecords(HoodieTableMetaClient metaClient, 
List<String> commitsToCatchup) throws IOException {
     long totalNew = 0;
-    HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
     for (String commit : commitsToCatchup) {
       HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes(
           timeline.getInstantDetails(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, commit)).get(),
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
index c3bbbef0cf4..87bb2b7d406 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
@@ -192,7 +192,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     assertTrue(prepareTable());
 
     HoodieTimeline timeline =
-        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
     assertEquals(0, timeline.countInstants(), "There should have no instant at 
first");
 
     // generate four savepoints
@@ -203,14 +203,14 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
 
     // Before refresh, no instant
     timeline =
-        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
     assertEquals(0, timeline.countInstants(), "there should have no instant");
 
     Object result = shell.evaluate(() -> command);
     assertTrue(ShellEvaluationResultUtil.isSuccess(result));
 
     timeline =
-        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+        
HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
 
     // After refresh, there are 4 instants
     assertEquals(4, timeline.countInstants(), "there should have 4 instants");
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
index 8f1d07b4eb5..ced1cf7a3ef 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -137,7 +137,7 @@ public class ITTestSavepointsCommand extends 
HoodieCLIIntegrationTestBase {
     assertEquals(1, timeline.getRestoreTimeline().countInstants());
 
     // 103 instant had rollback
-    assertFalse(timeline.getCommitTimeline().containsInstant(
+    assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant(
         new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
   }
 
@@ -182,9 +182,9 @@ public class ITTestSavepointsCommand extends 
HoodieCLIIntegrationTestBase {
     assertEquals(1, timeline.getRestoreTimeline().countInstants());
 
     // 103 and 104 instant had rollback
-    assertFalse(timeline.getCommitTimeline().containsInstant(
+    assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant(
         new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
-    assertFalse(timeline.getCommitTimeline().containsInstant(
+    assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant(
         new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104")));
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index 99b5d833f50..6023b17ce0d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -143,7 +143,7 @@ public class ConsistentBucketIndexUtils {
           && 
maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName())))
 {
         return loadMetadataFromGivenFile(table, maxMetadataFile);
       }
-      HoodieTimeline completedCommits = 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      HoodieTimeline completedCommits = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
 
       // fix the in-consistency between un-committed and committed hashing 
metadata files.
       List<FileStatus> fixed = new ArrayList<>();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index dd292830a85..46323954a5b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1330,7 +1330,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
   protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
     Option<HoodieInstant> lastCompletedCompactionInstant = 
metadataMetaClient.reloadActiveTimeline()
-        .getCommitTimeline().filterCompletedInstants().lastInstant();
+        .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
     if (lastCompletedCompactionInstant.isPresent()
         && metadataMetaClient.getActiveTimeline().filterCompletedInstants()
         
.findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants()
 < 3) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
index 8703ffb9de0..7084ae013e4 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
@@ -132,7 +132,7 @@ public class JavaUpsertPartitioner<T> implements 
Partitioner  {
     // for new inserts, compute buckets depending on how many records we have 
for each partition
     Set<String> partitionPaths = profile.getPartitionPaths();
     long averageRecordSize =
-        
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(),
             config);
     LOG.info("AvgRecordSize => " + averageRecordSize);
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 1c26fb82001..d697c192221 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -1716,7 +1716,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new 
HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004")));
 
     // Compaction may occur if the commits completed in order
-    
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()
 <= 1);
+    
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()
 <= 1);
 
     // Validation
     validateMetadata(writeClients[0]);
@@ -1763,7 +1763,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // 6 commits and 2 cleaner commits.
       
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
 8);
-      
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()
 <= 1);
+      
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()
 <= 1);
       // Validation
       validateMetadata(writeClient);
     }
@@ -2034,7 +2034,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // There should not be any compaction yet and we have not performed more 
than maxDeltaCommitsBeforeCompaction
       // deltacommits (1 will be due to bootstrap)
       HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 0);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 0);
       
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 maxDeltaCommitsBeforeCompaction - 1);
       
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 
0);
 
@@ -2044,7 +2044,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       client.startCommitWithTime(newCommitTime);
       client.insert(records, newCommitTime);
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 1);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 1);
       
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 maxDeltaCommitsBeforeCompaction + 1);
       
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 
0);
 
@@ -2065,7 +2065,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // Ensure no more compactions took place due to the leftover inflight 
commit
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 1);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 1);
       
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
           ((2 * maxDeltaCommitsBeforeCompaction) + 
(maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in 
metadata table */);
 
@@ -2080,7 +2080,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // Ensure compactions took place
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 2);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 2);
       
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
           ((2 * maxDeltaCommitsBeforeCompaction) + 
(maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in 
metadata table */));
       
assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 
0);
@@ -2428,7 +2428,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         client.upsert(records, newCommitTime);
       }
     }
-    
assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(),
 3);
+    
assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 3);
 
     try (HoodieJavaWriteClient client = new 
HoodieJavaWriteClient(engineContext, writeConfig)) {
       // Perform a clean
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 6f5352e2a34..0d4b77ec43d 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -520,7 +520,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
         0, 150);
 
     HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, 
false);
-    List<HoodieInstant> instants = 
activeTimeline.getCommitTimeline().getInstants();
+    List<HoodieInstant> instants = 
activeTimeline.getCommitAndReplaceTimeline().getInstants();
     assertEquals(5, instants.size());
     assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
         instants.get(0));
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 430f8f01a5e..1e43a4d3840 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -867,7 +867,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = createMetaClient();
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
 
     if (assertForCommit) {
       assertEquals(3, timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 007097a0a6c..230f684d165 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -135,7 +135,7 @@ public class TestMultiFS extends 
HoodieSparkClientTestHarness {
       // Read from hdfs
       FileSystem fs = HadoopFSUtils.getFs(dfsBasePath, 
HoodieTestUtils.getDefaultStorageConf());
       HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(HadoopFSUtils.getStorageConf(fs.getConf()), 
dfsBasePath);
-      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, 
sqlContext, timeline, readCommitTime);
       assertEquals(readRecords.count(), records.size());
 
@@ -156,7 +156,7 @@ public class TestMultiFS extends 
HoodieSparkClientTestHarness {
       LOG.info("Reading from path: " + tablePath);
       fs = HadoopFSUtils.getFs(tablePath, 
HoodieTestUtils.getDefaultStorageConf());
       metaClient = HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(fs.getConf()), tablePath);
-      timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       Dataset<Row> localReadRecords =
           HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, 
writeCommitTime);
       assertEquals(localReadRecords.count(), localRecords.size());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index aeb0627744e..9ed2dce3ce5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -309,7 +309,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
         (String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, 
true, numRecords, 3 * numRecords, 6, false);
 
     // new commit
-    HoodieTimeline curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline curTimeline = 
metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
     assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("006"));
     checkReadRecords("000", 3 * numRecords);
 
@@ -333,7 +333,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
 
   private void checkReadRecords(String instantTime, int numExpectedRecords) 
throws IOException {
     if (tableType == HoodieTableType.COPY_ON_WRITE) {
-      HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitTimeline();
+      HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline();
       assertEquals(numExpectedRecords, 
HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, 
timeline, Option.of(instantTime)));
     } else {
       // TODO: This code fails to read records under the following conditions:
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 30b1b63998d..3dfb61c2cea 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2131,7 +2131,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new 
HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004")));
 
     // Compaction may occur if the commits completed in order
-    
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()
 <= 1);
+    
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()
 <= 1);
 
     // Validation
     validateMetadata(writeClients[0]);
@@ -2179,7 +2179,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // 6 commits and 2 cleaner commits.
       
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
 8);
-      
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()
 <= 1);
+      
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()
 <= 1);
       // Validation
       validateMetadata(writeClient);
     }
@@ -2444,7 +2444,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // There should not be any compaction yet and we have not performed more 
than maxDeltaCommitsBeforeCompaction
       // deltacommits (1 will be due to bootstrap)
       HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 0);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 0);
       
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 maxDeltaCommitsBeforeCompaction - 1);
       
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 
0);
 
@@ -2454,7 +2454,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       client.startCommitWithTime(newCommitTime);
       client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 1);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 1);
       
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
 maxDeltaCommitsBeforeCompaction + 1);
       
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 
0);
 
@@ -2475,7 +2475,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // Ensure no more compactions took place due to the leftover inflight 
commit
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 1);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 1);
       
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
           ((2 * maxDeltaCommitsBeforeCompaction) + 
(maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in 
metadata table */);
 
@@ -2490,7 +2490,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // Ensure compactions took place
       metadataTimeline = metadataMetaClient.reloadActiveTimeline();
-      
assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(),
 2);
+      
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 2);
       
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
           ((2 * maxDeltaCommitsBeforeCompaction) + 
(maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in 
metadata table */));
       
assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 
0);
@@ -3120,7 +3120,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
       }
     }
-    
assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(),
 3);
+    
assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 3);
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
       // Perform a clean
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 0db85ae69c1..74e998349ea 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -685,7 +685,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         0, 150);
 
     HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, 
false);
-    List<HoodieInstant> instants = 
activeTimeline.getCommitTimeline().getInstants();
+    List<HoodieInstant> instants = 
activeTimeline.getCommitAndReplaceTimeline().getInstants();
     assertEquals(5, instants.size());
     assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
         instants.get(0));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index c451f4bd938..ad612ee5c9b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -121,7 +121,7 @@ public class TestHoodieMergeHandle extends 
HoodieSparkClientTestHarness {
 
       // verify that there is a commit
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       assertEquals(1, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(),
           "Expecting a single commit.");
       assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), 
"Latest commit should be 001");
@@ -147,7 +147,7 @@ public class TestHoodieMergeHandle extends 
HoodieSparkClientTestHarness {
 
       // verify that there are 2 commits
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       assertEquals(2, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(), "Expecting two commits.");
       assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), 
"Latest commit should be 002");
       Dataset<Row> dataSet = getRecords();
@@ -167,7 +167,7 @@ public class TestHoodieMergeHandle extends 
HoodieSparkClientTestHarness {
 
       // verify that there are now 3 commits
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       assertEquals(3, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(), "Expecting three commits.");
       assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), 
"Latest commit should be 003");
       dataSet = getRecords();
@@ -197,7 +197,7 @@ public class TestHoodieMergeHandle extends 
HoodieSparkClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // verify there are now 4 commits
-      timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
       assertEquals(4, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(), "Expecting four commits.");
       assertEquals(timeline.lastInstant().get().getTimestamp(), newCommitTime, 
"Latest commit should be 004");
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 723fa6b1614..2de9f5d3784 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -154,7 +154,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
     assertNoWriteErrors(statuses.collect());
     // verify that there is a commit
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
     assertEquals(1, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
     // Should have 100 records in table (check using Index), all in locations 
marked at commit
     HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, 
metaClient);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index f037f46a309..9e1f4277c57 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -161,7 +161,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertTrue(deltaCommit.isPresent());
       assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit 
should be 001");
 
-      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -195,7 +195,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertTrue(deltaCommit.isPresent());
       assertEquals("002", deltaCommit.get().getTimestamp(), "Latest Delta 
commit should be 002");
 
-      commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -653,7 +653,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertTrue(deltaCommit.isPresent());
       assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit 
should be 001");
 
-      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 209d70e499a..f271356bcb9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -270,7 +270,7 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     // Then: 1 delta commit is done, the failed compaction is retried
     metaClient = createMetaClient(cfg.getBasePath());
     assertEquals(4, 
metaClient.getActiveTimeline().getWriteTimeline().countInstants());
-    assertEquals(instantTime2, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+    assertEquals(instantTime2, 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
 
   @Test
@@ -308,7 +308,7 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     metaClient = createMetaClient(cfg.getBasePath());
     // 2 delta commits at the beginning. 1 compaction, 1 delta commit 
following it.
     assertEquals(4, 
metaClient.getActiveTimeline().getWriteTimeline().countInstants());
-    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
 
   @Test
@@ -345,6 +345,6 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     // Then: 1 delta commit is done, the failed compaction is retried
     metaClient = createMetaClient(cfg.getBasePath());
     assertEquals(4, 
metaClient.getActiveTimeline().getWriteTimeline().countInstants());
-    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 00ff11b57d0..e78ed757e8f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -289,7 +289,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     //2. rollback
     HoodieInstant commitInstant;
     if (isUsingMarkers) {
-      commitInstant = 
table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get();
+      commitInstant = 
table.getActiveTimeline().getCommitAndReplaceTimeline().filterInflights().lastInstant().get();
     } else {
       commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 8e85208af6f..dd1d6c2431a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -284,7 +284,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       assertTrue(deltaCommit.isPresent());
       assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit 
should be 001");
 
-      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -327,7 +327,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       assertTrue(deltaCommit.isPresent());
       assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta 
commit should be 004");
 
-      commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       allFiles = listAllBaseFilesInPath(hoodieTable);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 10d26f83698..c08026946c0 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -123,7 +123,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       client.commit(newCommitTime, jsc().parallelize(statuses));
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertTrue(commit.isPresent());
       assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
 
@@ -199,7 +199,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       assertTrue(deltaCommit.isPresent());
       assertEquals("000000001", deltaCommit.get().getTimestamp(), "Delta 
commit should be 000000001");
 
-      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -505,7 +505,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       assertEquals(200, 
getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue()));
 
       Option<HoodieInstant> commit =
-          metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+          
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 09aff48224d..b41c15a9898 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -529,7 +529,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, basePath);
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
 
     if (assertForCommit) {
       assertEquals(3, timeline.findInstantsAfter(initCommitTime, 
Integer.MAX_VALUE).countInstants(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index e45578211cb..79dda856367 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -289,7 +289,7 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
         "Delta commit should be specified value");
 
     Option<HoodieInstant> commit =
-        
reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant();
+        
reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant();
     assertFalse(commit.isPresent());
 
     List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -337,7 +337,7 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
         "Latest Delta commit should match specified time");
 
     Option<HoodieInstant> commit =
-        
reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+        
reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
     assertFalse(commit.isPresent());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 319cbdfbb4a..436a8c221fe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -547,7 +547,7 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTimeline getCommitsTimeline() {
     switch (this.getTableType()) {
       case COPY_ON_WRITE:
-        return getActiveTimeline().getCommitTimeline();
+        return getActiveTimeline().getCommitAndReplaceTimeline();
       case MERGE_ON_READ:
         // We need to include the parquet files written out in delta commits
         // Include commit action to be able to start doing a MOR over a COW 
table - no
@@ -567,7 +567,7 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTimeline getCommitsAndCompactionTimeline() {
     switch (this.getTableType()) {
       case COPY_ON_WRITE:
-        return getActiveTimeline().getCommitTimeline();
+        return getActiveTimeline().getCommitAndReplaceTimeline();
       case MERGE_ON_READ:
         return getActiveTimeline().getWriteTimeline();
       default:
@@ -583,7 +583,7 @@ public class HoodieTableMetaClient implements Serializable {
       case COPY_ON_WRITE:
       case MERGE_ON_READ:
         // We need to include the parquet files written out in delta commits 
in tagging
-        return getActiveTimeline().getCommitTimeline();
+        return getActiveTimeline().getCommitAndReplaceTimeline();
       default:
         throw new HoodieException("Unsupported table type :" + 
this.getTableType());
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 68cf428d364..12ea0085d51 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -318,13 +318,20 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   /**
-   * Get only pure commits (inflight and completed) in the active timeline.
+   * Get only pure commit and replace commits (inflight and completed) in the 
active timeline.
    */
-  public HoodieTimeline getCommitTimeline() {
+  public HoodieTimeline getCommitAndReplaceTimeline() {
     //TODO: Make sure this change does not break existing functionality.
     return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
   }
 
+  /**
+   * Get only pure commits (inflight and completed) in the active timeline.
+   */
+  public HoodieTimeline getCommitTimeline() {
+    return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION));
+  }
+
   /**
    * Get only the delta commits (inflight and completed) in the active 
timeline.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 0f41f1314e1..4ef30a2656a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -285,8 +285,7 @@ public class CompactionUtils {
    */
   public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestCompaction(
       HoodieActiveTimeline activeTimeline) {
-    Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
+    Option<HoodieInstant> lastCompaction = 
activeTimeline.getCommitTimeline().filterCompletedInstants().lastInstant();
     HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline();
 
     HoodieInstant latestInstant;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index efdb1baf23d..2cb42af683b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -593,7 +593,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   @Override
   public Option<String> getLatestCompactionTime() {
     if (metadataMetaClient != null) {
-      Option<HoodieInstant> latestCompaction = 
metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+      Option<HoodieInstant> latestCompaction = 
metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
       if (latestCompaction.isPresent()) {
         return Option.of(latestCompaction.get().getTimestamp());
       }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 9bbc72289f5..0b90889cfa7 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -86,7 +86,7 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
   @Test
   public void checkCommitTimeline() {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty(), "Should be empty commit 
timeline");
 
     HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, "1");
@@ -95,12 +95,12 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
 
     // Commit timeline should not auto-reload every time 
getActiveCommitTimeline(), it should be cached
     activeTimeline = metaClient.getActiveTimeline();
-    activeCommitTimeline = activeTimeline.getCommitTimeline();
+    activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty(), "Should be empty commit 
timeline");
 
-    HoodieInstant completedInstant = 
HoodieTimeline.getCompletedInstant(instant);
     activeTimeline = activeTimeline.reload();
-    activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieInstant completedInstant = 
activeTimeline.getCommitsTimeline().getInstantsAsStream().findFirst().get();
+    activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline();
     assertFalse(activeCommitTimeline.empty(), "Should be the 1 commit we 
made");
     assertEquals(completedInstant, 
activeCommitTimeline.getInstantsAsStream().findFirst().get(),
         "Commit should be 1");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index eef515c6ada..588fc114a3e 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -107,7 +107,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   @Test
   public void testGetPartitionsWithReplaceCommits() throws IOException {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
 
     String ts1 = "1";
@@ -146,7 +146,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   @Test
   public void testGetPartitions() throws IOException {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
 
     String olderPartition = "0"; // older partitions that is modified by all 
cleans
@@ -185,7 +185,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   @Test
   public void testGetPartitionsUnPartitioned() throws IOException {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
 
     String partitionPath = "";
@@ -213,7 +213,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   @Test
   public void testRestoreInstants() throws Exception {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
 
     for (int i = 1; i <= 5; i++) {
@@ -238,7 +238,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String extraMetadataKey = "test_key";
     String extraMetadataValue1 = "test_value1";
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
     assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, 
extraMetadataKey).isPresent());
 
@@ -616,7 +616,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   @Test
   public void testGetDroppedPartitions() throws Exception {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    HoodieTimeline activeCommitTimeline = 
activeTimeline.getCommitAndReplaceTimeline();
     assertTrue(activeCommitTimeline.empty());
 
     String olderPartition = "p1"; // older partitions that will be deleted by 
clean commit
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index fa2d7558ef5..1d4be5f02c8 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -120,12 +120,16 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     assertStreamEquals(
         Stream.of(instant1Complete, instant2Complete, instant3Complete, 
instant4Complete, instant5),
         timeline.getCommitTimeline().getInstantsAsStream(), "Check the 
instants stream");
+
+    assertStreamEquals(
+        Stream.of(instant1Complete, instant2Complete, instant3Complete, 
instant4Complete, instant5),
+        timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check 
the instants stream");
     assertStreamEquals(
         Stream.of(instant1Complete, instant2Complete, instant3Complete, 
instant4Complete),
-        
timeline.getCommitTimeline().filterCompletedInstants().getInstantsAsStream(),
+        
timeline.getCommitAndReplaceTimeline().filterCompletedInstants().getInstantsAsStream(),
         "Check the instants stream");
     assertStreamEquals(Stream.of(instant5),
-        
timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(),
+        
timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(),
         "Check the instants stream");
 
     // Backwards compatibility testing for reading compaction plans
@@ -174,23 +178,23 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", 
"11", "13", "15", "17", "19"),
         Stream.of("21", "23"));
     assertStreamEquals(Stream.of("05", "07", "09", "11"),
-        
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04",
 "11")
+        
timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInRange("04",
 "11")
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
         "findInstantsInRange should return 4 instants");
     assertStreamEquals(Stream.of("03", "05", "07", "09", "11"),
-        
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03",
 "11")
+        
timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInClosedRange("03",
 "11")
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
         "findInstantsInClosedRange should return 5 instants");
     assertStreamEquals(Stream.of("09", "11"),
-        
timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 
2)
+        
timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsAfter("07",
 2)
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
         "findInstantsAfter 07 should return 2 instants");
     assertStreamEquals(Stream.of("01", "03", "05"),
-        
timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07")
+        
timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsBefore("07")
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
         "findInstantsBefore 07 should return 3 instants");
     assertFalse(timeline.empty());
-    
assertFalse(timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().empty());
+    
assertFalse(timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().empty());
     assertEquals(12, timeline.countInstants());
     assertEquals("01", timeline.firstInstant(
         HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp());
@@ -201,7 +205,7 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     assertFalse(timeline.firstInstant(
         HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent());
     
-    HoodieTimeline activeCommitTimeline = 
timeline.getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline activeCommitTimeline = 
timeline.getCommitAndReplaceTimeline().filterCompletedInstants();
     assertEquals(10, activeCommitTimeline.countInstants());
 
     assertEquals("01", 
activeCommitTimeline.firstInstant().get().getTimestamp());
@@ -346,7 +350,7 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
         HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, 
HoodieTimeline.REPLACE_COMMIT_ACTION));
     checkTimeline.accept(timeline.getWriteTimeline(), 
CollectionUtils.createSet(
         HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, 
HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
-    checkTimeline.accept(timeline.getCommitTimeline(),  
CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.REPLACE_COMMIT_ACTION));
+    checkTimeline.accept(timeline.getCommitAndReplaceTimeline(),  
CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.REPLACE_COMMIT_ACTION));
     checkTimeline.accept(timeline.getDeltaCommitTimeline(), 
Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION));
     checkTimeline.accept(timeline.getCleanerTimeline(), 
Collections.singleton(HoodieTimeline.CLEAN_ACTION));
     checkTimeline.accept(timeline.getRollbackTimeline(), 
Collections.singleton(HoodieTimeline.ROLLBACK_ACTION));
@@ -551,12 +555,12 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
   public void testReplaceActionsTimeline() {
     int instantTime = 1;
     List<HoodieInstant> allInstants = new ArrayList<>();
-    HoodieInstant instant = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
-    allInstants.add(instant);
-    instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
String.format("%03d", instantTime++));
-    allInstants.add(instant);
-    instant = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++));
-    allInstants.add(instant);
+    HoodieInstant instant1 = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
+    allInstants.add(instant1);
+    HoodieInstant instant2 = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
+    allInstants.add(instant2);
+    HoodieInstant instant3 = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++));
+    allInstants.add(instant3);
 
     timeline = new HoodieActiveTimeline(metaClient);
     timeline.setInstants(allInstants);
@@ -564,8 +568,16 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
         timeline.getCompletedReplaceTimeline().getInstants();
 
     assertEquals(1, validReplaceInstants.size());
-    assertEquals(instant.getTimestamp(), 
validReplaceInstants.get(0).getTimestamp());
+    assertEquals(instant3.getTimestamp(), 
validReplaceInstants.get(0).getTimestamp());
     assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, 
validReplaceInstants.get(0).getAction());
+
+    assertStreamEquals(
+        Stream.of(instant1, instant2, instant3),
+        timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check 
the instants stream");
+
+    assertStreamEquals(
+        Stream.of(instant1, instant2),
+        timeline.getCommitTimeline().getInstantsAsStream(), "Check the 
instants stream");
   }
 
   @Test
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index 4741cdef1f8..407251c64b2 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -291,6 +291,59 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void 
testGetDeltaCommitsSinceLastCompactionWithCompletedReplaceCommits() {
+    // 4th replace commit.
+    HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(
+        Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"01"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+            new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+            new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, 
"09")));
+
+    Pair<HoodieTimeline, HoodieInstant> actual =
+        CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get();
+    assertEquals(
+        Stream.of(
+                new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"07"),
+                new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, 
"08"),
+                new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, 
"09"))
+            .collect(Collectors.toList()),
+        actual.getLeft().getInstants());
+    assertEquals(
+        new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"),
+        actual.getRight());
+
+    // mix of compaction commit and replace commit.
+    timeline = new MockHoodieActiveTimeline(
+        Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"01"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+            new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"),
+            new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, 
"06"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+            new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, 
"09")));
+
+    actual =
+        CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get();
+    assertEquals(
+        Stream.of(
+                new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"07"),
+                new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
"08"),
+                new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, 
"09"))
+            .collect(Collectors.toList()),
+        actual.getLeft().getInstants());
+    assertEquals(
+        new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"),
+        actual.getRight());
+  }
+
   @Test
   public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() {
     HoodieActiveTimeline timeline = new MockHoodieActiveTimeline();
@@ -386,6 +439,11 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
       this.setInstants(new ArrayList<>());
     }
 
+    public MockHoodieActiveTimeline(Stream<HoodieInstant> instants) {
+      super();
+      setInstants(instants.collect(Collectors.toList()));
+    }
+
     public MockHoodieActiveTimeline(
         Stream<String> completedDeltaCommits,
         Stream<String> completedCompactionCommits,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
index 2319d40480e..1f523aabc99 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
@@ -54,7 +54,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure 
with ProcedureBuilde
 
     val metaClient = createMetaClient(jsc, tablePath)
 
-    val latestCommit: String = 
metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
+    val latestCommit: String = 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp
     val partitionPaths: util.List[String] = 
FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getStorage, tablePath);
     val basePath: StoragePath = new StoragePath(tablePath)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index b9f43e12e66..292f6d5fdee 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -72,7 +72,7 @@ class RepairMigratePartitionMetaProcedure extends 
BaseProcedure with ProcedureBu
         metaClient.getStorage, partition)
       val baseFormatFile: Option[StoragePath] = 
HoodiePartitionMetadata.baseFormatMetaPathIfExists(
         metaClient.getStorage, partition)
-      val latestCommit: String = 
metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
+      val latestCommit: String = 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp
       var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE"
       if (!dryRun) {
         if (!baseFormatFile.isPresent) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 4afa328b84a..1a025042f9b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -68,7 +68,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         .withBasePath(basePath)
         .withLogFilePaths(logFilePaths.asJava)
         .withReaderSchema(schema)
-        
.withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp)
+        
.withLatestInstantTime(client.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp)
         
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
         
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue)
         
.withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
index 36be3b14678..5556fd93b33 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
@@ -46,7 +46,7 @@ class StatsWriteAmplificationProcedure extends BaseProcedure 
with ProcedureBuild
     val basePath = getBasePath(table)
     val client = createMetaClient(jsc, basePath)
     val activeTimeline = client.getActiveTimeline
-    val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants()
+    val timeline = 
activeTimeline.getCommitAndReplaceTimeline.filterCompletedInstants()
 
     val rows = new java.util.ArrayList[Row]
     val df = new DecimalFormat("#.00")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
index 10a10160745..57a17b213b8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
@@ -190,7 +190,7 @@ class ValidateHoodieSyncProcedure extends BaseProcedure 
with ProcedureBuilder wi
   @throws[IOException]
   def countNewRecords(target: HoodieTableMetaClient, commitsToCatchup: 
List[String]): Long = {
     var totalNew: Long = 0
-    val timeline: HoodieTimeline = 
target.reloadActiveTimeline.getCommitTimeline.filterCompletedInstants
+    val timeline: HoodieTimeline = 
target.reloadActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants
     for (commit <- commitsToCatchup) {
       val c: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(new 
HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get, 
classOf[HoodieCommitMetadata])
       totalNew += c.fetchTotalRecordsWritten - c.fetchTotalUpdateRecordsWritten
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 086363e447c..d02204dbe9b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -202,9 +202,9 @@ public class HoodieJavaStreamingApp {
     HoodieTableMetaClient metaClient = 
HoodieClientTestUtils.createMetaClient(jssc, tablePath);
     if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
       // Ensure we have successfully completed one compaction commit
-      
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants()
 == 1);
+      
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants()
 == 1);
     } else {
-      
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants()
 >= 1);
+      
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants()
 >= 1);
     }
 
     // Deletes Stream
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index ad017a5a4dc..6e9e2a0a481 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -177,6 +177,6 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     }
     // compaction should have been completed
     val metaClient = HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(fs.getConf), basePath)
-    assertEquals(1, 
metaClient.getActiveTimeline.getCommitTimeline.countInstants())
+    assertEquals(1, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants())
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 054744109b0..babe1f73acd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -503,6 +503,6 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       streamingWrite(inputDF.schema, sourcePath, destPath, opts, id)
     }
     val metaClient = HoodieTestUtils.createMetaClient(storage, destPath);
-    assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty())
+    
assertTrue(metaClient.getActiveTimeline.getCommitAndReplaceTimeline.empty())
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index cad585b6453..2da80c888dd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -28,7 +28,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
 import org.apache.hudi.config.HoodieWriteConfig
-
 import org.apache.avro.generic.GenericRecord
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SaveMode}
@@ -333,6 +332,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2))
     inputDF4.write.format("org.apache.hudi")
       .options(options)
+      .option("hoodie.compact.inline", "false")
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
       .save(basePath)
@@ -357,6 +357,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
       .options(options)
       .option("hoodie.clustering.inline", "true")
       .option("hoodie.clustering.inline.max.commits", "1")
+      .option("hoodie.compact.inline", "false")
       .mode(SaveMode.Append)
       .save(basePath)
     val instant5 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -385,6 +386,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val inputDF6 = spark.read.json(spark.sparkContext.parallelize(records6, 2))
     inputDF6.write.format("org.apache.hudi")
       .options(options)
+      .option("hoodie.compact.inline", "false")
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
       .save(basePath)
@@ -407,27 +409,32 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val inputDF7 = spark.read.json(spark.sparkContext.parallelize(records7, 2))
     inputDF7.write.format("org.apache.hudi")
       .options(options)
+      .option("hoodie.compact.inline", "false")
       .mode(SaveMode.Append)
       .save(basePath)
+    totalInsertedCnt += 7
 
     val records8 = recordsToStrings(dataGen.generateInserts("007", 
3)).asScala.toList
     val inputDF8 = spark.read.json(spark.sparkContext.parallelize(records8, 2))
     inputDF8.write.format("org.apache.hudi")
       .options(options)
+      .option("hoodie.compact.inline", "false")
       .mode(SaveMode.Append)
       .save(basePath)
     val instant8 = metaClient.reloadActiveTimeline.lastInstant().get()
     val commitTime8 = instant8.getTimestamp
+    totalInsertedCnt += 3
 
     // 8. Upsert Operation With Clean Operation
-    val records9 = recordsToStrings(dataGen.generateUniqueUpdates("008", 
30)).asScala.toList
-    val inputDF9 = spark.read.json(spark.sparkContext.parallelize(records9, 2))
+    val inputDF9 = inputDF6.limit(30) // 30 updates to inserts added after 
insert overwrite table. if not for this, updates generated from datagne,
+    // could split as inserts and updates from hudi standpoint due to insert 
overwrite table operation.
     inputDF9.write.format("org.apache.hudi")
       .options(options)
       .option("hoodie.clean.automatic", "true")
-      .option("hoodie.keep.min.commits", "4")
-      .option("hoodie.keep.max.commits", "5")
-      .option("hoodie.cleaner.commits.retained", "3")
+      .option("hoodie.keep.min.commits", "16")
+      .option("hoodie.keep.max.commits", "17")
+      .option("hoodie.clean.commits.retained", "15")
+      .option("hoodie.compact.inline", "false")
       .mode(SaveMode.Append)
       .save(basePath)
     val instant9 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -440,13 +447,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val updatedCnt9 = 30 - insertedCnt9
     assertCDCOpCnt(cdcDataOnly9, insertedCnt9, updatedCnt9, 0)
 
-    // here cause we do the clean operation and just remain the commit6 and 
commit7, so we need to reset the total cnt.
-    // 70 is the number of inserted records at commit 6.
-    totalInsertedCnt = 80 + insertedCnt9
-    totalUpdatedCnt = updatedCnt9
-    totalDeletedCnt = 0
     allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString)
-    assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, 
totalDeletedCnt)
+    assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt + 30, 
totalDeletedCnt)
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index 5675ac4ebe9..672f3308765 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -254,7 +254,7 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient)
 
       // get fs and check number of latest files
-      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants,
         metaClient.getStorage.listDirectEntries(new 
StoragePath(duplicatedPartitionPath)))
       val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
       // there should be 3 files
@@ -311,7 +311,7 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient)
 
       // get fs and check number of latest files
-      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants,
         metaClient.getStorage.listDirectEntries(new 
StoragePath(duplicatedPartitionPathWithUpdates)))
       val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
       // there should be 2 files
@@ -369,7 +369,7 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient)
 
       // get fs and check number of latest files
-      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants,
         metaClient.getStorage.listDirectEntries(new 
StoragePath(duplicatedPartitionPathWithUpserts)))
       val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
       // there should be 3 files
@@ -427,7 +427,7 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient)
 
       // get fs and check number of latest files
-      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants,
         metaClient.getStorage.listDirectEntries(new 
StoragePath(duplicatedPartitionPath)))
       val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
       // there should be 3 files
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index e28b5bdec59..51a8d26754a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -645,7 +645,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
     static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath) {
       HoodieTableMetaClient meta = createMetaClient(storage, tablePath);
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
       int numCompactionCommits = timeline.countInstants();
       assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
@@ -661,7 +661,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
     static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, 
String lastSuccessfulCommit, String tablePath) {
       HoodieTableMetaClient meta = createMetaClient(storage.getConf(), 
tablePath);
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
       int numCompactionCommits = timeline.countInstants();
       assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index cb30d3dc0be..4da6ef51b62 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -875,7 +875,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // delete compaction commit
     HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, 
tableBasePath);
-    HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
     HoodieInstant commitInstant = timeline.lastInstant().get();
     String commitFileName = tableBasePath + "/.hoodie/" + 
commitInstant.getFileName();
     fs.delete(new Path(commitFileName), false);


Reply via email to