This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 506f106cc2f [HUDI-7507] Adding timestamp ordering validation before 
creating requested instant (#11580)
506f106cc2f is described below

commit 506f106cc2f17021342a017cac023a43b15b9e01
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Oct 7 14:36:21 2024 -0700

    [HUDI-7507] Adding timestamp ordering validation before creating requested 
instant (#11580)
    
    - Adding timestamp ordering validation before creating requested timeline 
file
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  26 ++++-
 .../hudi/client/timeline/TimestampUtils.java       |  37 +++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  17 +++
 .../java/org/apache/hudi/table/HoodieTable.java    |  13 +++
 .../action/clean/CleanPlanActionExecutor.java      |   1 +
 .../cluster/ClusteringPlanActionExecutor.java      |   1 +
 .../compact/ScheduleCompactionActionExecutor.java  |   1 +
 .../rollback/BaseRollbackPlanActionExecutor.java   |   1 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   5 +
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   5 +
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   5 +
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   5 +
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   6 +-
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  |  63 +++++++----
 .../apache/hudi/client/SparkRDDWriteClient.java    |   5 +
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   5 +
 .../TestMultiWriterWithPreferWriterIngestion.java  | 121 ++++++++++++++++-----
 .../functional/TestHoodieBackedMetadata.java       |   6 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  90 +++++++++++----
 .../java/org/apache/hudi/table/TestCleaner.java    |  46 ++++----
 .../clean/TestCleanerInsertAndCleanByVersions.java |   2 +-
 .../TestCopyOnWriteRollbackActionExecutor.java     |   9 +-
 .../table/functional/MockCompactionStrategy.java   |  41 +++++++
 .../table/functional/TestCleanPlanExecutor.java    |  61 ++++++-----
 .../TestHoodieSparkMergeOnReadTableCompaction.java | 106 ++++++++++++++++++
 .../hudi/testutils/HoodieCleanerTestBase.java      |  22 ++--
 .../hudi/functional/RecordLevelIndexTestBase.scala |  28 +++--
 .../TestHoodieBulkInsertDataInternalWriter.java    |   7 +-
 .../TestHoodieDataSourceInternalWriter.java        |  11 +-
 .../TestHoodieBulkInsertDataInternalWriter.java    |   5 +-
 .../TestHoodieDataSourceInternalBatchWrite.java    |   8 +-
 .../sources/TestGcsEventsHoodieIncrSource.java     |   2 +-
 .../utilities/sources/TestHoodieIncrSource.java    |  24 ++--
 .../sources/TestS3EventsHoodieIncrSource.java      |   2 +-
 34 files changed, 605 insertions(+), 182 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b9da3387654..780d48d16bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -30,6 +30,7 @@ import 
org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.timeline.TimestampUtils;
 import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieCommonConfig;
@@ -331,6 +332,21 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
 
   protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig 
config, Configuration hadoopConf, HoodieTableMetaClient metaClient);
 
+  /**
+   * Validate timestamp for new commits. Here we validate that the instant 
time being validated is the latest among all entries in the timeline.
+   * This will ensure timestamps are monotonically increasing. With 
multi=writers, out of order commits completion are still possible, just that
+   * when a new commit starts, it will always get the highest commit time 
compared to other instants in the timeline.
+   * @param metaClient instance of{@link HoodieTableMetaClient} to be used.
+   * @param instantTime instant time of the current commit thats in progress.
+   */
+  protected abstract void validateTimestamp(HoodieTableMetaClient metaClient, 
String instantTime);
+
+  protected void validateTimestampInternal(HoodieTableMetaClient metaClient, 
String instantTime) {
+    if (config.shouldEnableTimestampOrderinValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+    }
+  }
+
   void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, 
String actionType) {
     if (writeTimer != null) {
       long durationInMs = metrics.getDurationInMs(writeTimer.stop());
@@ -932,6 +948,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
             + "table could be in an inconsistent state. Pending restores: " + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
             .map(instant -> 
instant.getTimestamp()).collect(Collectors.toList()).toArray()));
 
+    HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, 
instantTime, actionType);
+    this.txnManager.beginTransaction(Option.of(requestedInstant),
+        lastCompletedTxnAndMetadata.isPresent() ? 
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
+    try {
+      validateTimestamp(metaClient, instantTime);
+    } finally {
+      txnManager.endTransaction(Option.of(requestedInstant));
+    }
+
     // if there are pending compactions, their instantTime must not be greater 
than that of this instant time
     
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
 ->
         ValidationUtils.checkArgument(
@@ -941,7 +966,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     if (config.getFailedWritesCleanPolicy().isLazy()) {
       this.heartbeatClient.start(instantTime);
     }
-
     if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
       metaClient.getActiveTimeline().createRequestedReplaceCommit(instantTime, 
actionType);
     } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
new file mode 100644
index 00000000000..dfc1578290b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+
+public class TimestampUtils {
+
+  public static void validateForLatestTimestamp(HoodieTableMetaClient 
metaClient, String instantTime) {
+    // validate that the instant for which requested is about to be created is 
the latest in the timeline.
+    if (!metaClient.isMetadataTable()) { // lets validate data table that 
timestamps are generated in monotically increasing order. 
+      HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(metaClient);
+      
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry
 -> {
+        
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(),
 HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
+            "Found later commit time " + entry + ", compared to the current 
instant " + instantTime + ", hence failing to create requested commit meta 
file");
+      });
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 01a2b9c0c86..f4b2be343cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -741,6 +741,14 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  public static final ConfigProperty<Boolean> 
ENABLE_TIMESTAMP_ORDERING_VALIDATION = ConfigProperty
+      .key("hoodie.timestamp.ordering.validate.enable")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("0.15.1")
+      .withDocumentation("Enable validation for commit time generation to 
ensure new commit time generated is always the latest among other entries. "
+          + "This is for additional safety to always generate a monotonically 
increasing commit times (for ingestion writer, table services etc).");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -2621,6 +2629,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return props.getInteger(WRITES_FILEID_ENCODING, 
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
+  public Boolean shouldEnableTimestampOrderinValidation() {
+    return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3135,6 +3147,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withEnableTimestampOrderingValidation(boolean 
enableTimestampOrderingValidation) {
+      writeConfig.setValue(ENABLE_TIMESTAMP_ORDERING_VALIDATION, 
Boolean.toString(enableTimestampOrderingValidation));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 5d1279af2f7..8ff7b279024 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -31,6 +31,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.timeline.TimestampUtils;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -896,6 +897,18 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     }
   }
 
+  /**
+   * Validates that the instantTime is latest in the write timeline.
+   * @param instantTime instant time of interest.
+   */
+  public abstract void validateForLatestTimestamp(String instantTime);
+
+  protected void validateForLatestTimestampInternal(String instantTime) {
+    if (this.config.shouldEnableTimestampOrderinValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+    }
+  }
+
   public HoodieFileFormat getBaseFileFormat() {
     return metaClient.getTableConfig().getBaseFileFormat();
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 0329fc8ddc6..2cab17aab90 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -179,6 +179,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
       final HoodieInstant cleanInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, 
startCleanTime);
       // Save to both aux and timeline folder
       try {
+        table.validateForLatestTimestamp(cleanInstant.getTimestamp());
         table.getActiveTimeline().saveToCleanRequested(cleanInstant, 
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
         LOG.info("Requesting Cleaning with instant time " + cleanInstant);
       } catch (IOException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 54df15d6e80..91f64b1cee9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -98,6 +98,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor
             .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
             .setClusteringPlan(planOption.get())
             .build();
+        table.validateForLatestTimestamp(clusteringInstant.getTimestamp());
         table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
             
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
       } catch (IOException ioe) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 77178c55455..e28c1ccd143 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -114,6 +114,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseActionExec
     Option<HoodieCompactionPlan> option = Option.empty();
     if (plan != null && nonEmpty(plan.getOperations())) {
       extraMetadata.ifPresent(plan::setExtraMetadata);
+      table.validateForLatestTimestamp(instantTime);
       try {
         if (operationType.equals(WriteOperationType.COMPACT)) {
           HoodieInstant compactionInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 48af32751a3..045e8a5a720 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -113,6 +113,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
       HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new 
HoodieInstantInfo(instantToRollback.getTimestamp(),
           instantToRollback.getAction()), rollbackRequests, 
LATEST_ROLLBACK_PLAN_VERSION);
       if (!skipTimelinePublish) {
+        table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
         if 
(table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp()))
 {
           LOG.warn("Request Rollback found with instant time " + 
rollbackInstant + ", hence skipping scheduling rollback");
         } else {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index b31d4470d43..d762d52d9ee 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -123,6 +123,11 @@ public class HoodieFlinkWriteClient<T> extends
     return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, 
metaClient);
   }
 
+  @Override
+  protected void validateTimestamp(HoodieTableMetaClient metaClient, String 
instantTime) {
+    // no op
+  }
+
   @Override
   public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 705299e6f97..e65b89ab43a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -92,6 +92,11 @@ public class HoodieFlinkCopyOnWriteTable<T>
     super(config, context, metaClient);
   }
 
+  @Override
+  public void validateForLatestTimestamp(String instantTime) {
+    // no-op
+  }
+
   /**
    * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 596767e8cc6..ca55244128d 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -103,6 +103,11 @@ public class HoodieJavaWriteClient<T> extends
     return HoodieJavaTable.create(config, context, metaClient);
   }
 
+  @Override
+  protected void validateTimestamp(HoodieTableMetaClient metaClient, String 
instantTime) {
+    validateTimestampInternal(metaClient, instantTime);
+  }
+
   @Override
   public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
                                   String instantTime) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 525f153a395..cc91d5d3436 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -92,6 +92,11 @@ public class HoodieJavaCopyOnWriteTable<T>
     super(config, context, metaClient);
   }
 
+  @Override
+  public void validateForLatestTimestamp(String instantTime) {
+    validateForLatestTimestampInternal(instantTime);
+  }
+
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext 
context,
                                                        String instantTime,
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 5d58059b573..d2f069afbe4 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -1786,7 +1786,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     HoodieJavaWriteClient client = getHoodieWriteClient(config);
 
     // Write 1 (Bulk insert)
-    String newCommitTime = "0000001";
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
     client.startCommitWithTime(newCommitTime);
     List<WriteStatus> writeStatuses = client.insert(records, newCommitTime);
@@ -1794,7 +1794,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(client);
 
     // Write 2 (inserts)
-    newCommitTime = "0000002";
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     client.startCommitWithTime(newCommitTime);
     records = dataGen.generateInserts(newCommitTime, 20);
     writeStatuses = client.insert(records, newCommitTime);
@@ -1827,7 +1827,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), 
file))));
 
     // trigger new write to mimic other writes succeeding before re-attempt.
-    newCommitTime = "0000003";
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     client.startCommitWithTime(newCommitTime);
     records = dataGen.generateInserts(newCommitTime, 20);
     writeStatuses = client.insert(records, newCommitTime);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 7dee7d2b29c..5fc2c36acfc 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -1319,25 +1319,29 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
 
     // perform 1 successful commit
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, true);
 
     // Perform 2 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit1,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
     // Perform 1 successful write
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, true);
     HoodieTableMetaClient metaClient = createMetaClient();
@@ -1349,13 +1353,14 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     // Await till enough time passes such that the first 2 failed commits 
heartbeats are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
       Thread.sleep(2000);
     }
     client.close();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     // Perform 1 successful write
-    writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit5)), 
commit5,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, true);
     client.clean();
@@ -1396,47 +1401,53 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     HoodieTestUtils.init(storageConf, basePath);
     HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
     HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
     // Perform 1 successful writes to table
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, true);
 
     // Perform 1 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit2,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
     // Toggle cleaning policy to LAZY
     cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     // Perform 2 failed writes to table
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
     // Await till enough time passes such that the 2 failed commits heartbeats 
are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit4);
       Thread.sleep(2000);
     }
     client.clean();
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
     assertTrue(timeline.getTimelineOfActions(
         CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
     // Perform 2 failed commits
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
+    String commit6 = HoodieActiveTimeline.createNewInstantTime();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit6, commit5, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 300,
         0, false);
     client.close();
@@ -1458,28 +1469,33 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     ExecutorService service = Executors.newFixedThreadPool(2);
     HoodieTestUtils.init(storageConf, basePath);
     HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
     // perform 1 successful write
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 100,
         0, true);
 
     // Perform 2 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit2,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 100,
         0, false);
     client.close();
+
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     client = new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, 
false, 100, 100,
         0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
     // Create a successful commit
-    Future<List<WriteStatus>> commit3 = service.submit(() -> writeBatch(new 
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-        "400", "300", Option.of(Arrays.asList("400")), "300", 100, 
dataGen::generateInserts,
+    Future<List<WriteStatus>> commit4Future = service.submit(() -> 
writeBatch(new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true)),
+        commit4, commit3, Option.of(Arrays.asList(commit4)), commit3, 100, 
dataGen::generateInserts,
         HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
-    commit3.get();
+    commit4Future.get();
     HoodieTableMetaClient metaClient = createMetaClient();
 
     assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
@@ -1490,14 +1506,15 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     // Await till enough time passes such that the first 2 failed commits 
heartbeats are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
       Thread.sleep(2000);
     }
-    Future<List<WriteStatus>> commit4 = service.submit(() -> writeBatch(new 
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-        "500", "400", Option.of(Arrays.asList("500")), "500", 100, 
dataGen::generateInserts,
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
+    Future<List<WriteStatus>> commit5Future = service.submit(() -> 
writeBatch(new HoodieJavaWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true)),
+        commit5, commit4, Option.of(Arrays.asList(commit5)), commit5, 100, 
dataGen::generateInserts,
         HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
     Future<HoodieCleanMetadata> clean1 = service.submit(() -> new 
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)).clean());
-    commit4.get();
+    commit5Future.get();
     clean1.get();
     client.close();
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index bbdd34835ad..ef6cdde67bd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -129,6 +129,11 @@ public class SparkRDDWriteClient<T> extends
     return HoodieSparkTable.create(config, context, metaClient);
   }
 
+  @Override
+  protected void validateTimestamp(HoodieTableMetaClient metaClient, String 
instantTime) {
+    validateTimestampInternal(metaClient, instantTime);
+  }
+
   @Override
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 441ac9eb1ec..fea0a19dcdf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -104,6 +104,11 @@ public class HoodieSparkCopyOnWriteTable<T>
     super(config, context, metaClient);
   }
 
+  @Override
+  public void validateForLatestTimestamp(String instantTime) {
+    validateForLatestTimestampInternal(instantTime);
+  }
+
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> 
upsert(HoodieEngineContext context, String instantTime, 
HoodieData<HoodieRecord<T>> records) {
     return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) 
context, config, this, instantTime, records).execute();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 68aadf0cccf..0a8130f0788 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -56,6 +56,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
@@ -82,7 +83,7 @@ public class TestMultiWriterWithPreferWriterIngestion extends 
HoodieClientTestBa
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", 
"MERGE_ON_READ"})
+  @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ"})
   public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) 
throws Exception {
     // create inserts X 1
     if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -123,67 +124,123 @@ public class TestMultiWriterWithPreferWriterIngestion 
extends HoodieClientTestBa
     SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
     SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
     // Create upserts, schedule cleaning, schedule compaction in parallel
-    String instant4 = HoodieActiveTimeline.createNewInstantTime();
     Future future1 = executors.submit(() -> {
       int numRecords = 100;
       String commitTimeBetweenPrevAndNew = instantTime2;
       try {
-        // For both COW and MOR table types the commit should not be blocked, 
since we are giving preference to ingestion.
-        createCommitWithUpserts(cfg, client1, instantTime3, 
commitTimeBetweenPrevAndNew, instant4, numRecords);
-        validInstants.add(instant4);
+        int counter = 0;
+        while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+          // For both COW and MOR table types the commit should not be 
blocked, since we are giving preference to ingestion.
+          try {
+            String instant4 = HoodieActiveTimeline.createNewInstantTime();
+            createCommitWithUpserts(cfg, client1, instantTime3, 
commitTimeBetweenPrevAndNew, instant4, numRecords);
+            validInstants.add(instant4);
+            break;
+          } catch (IllegalArgumentException e) {
+            if (!e.getMessage().toString().contains("Found later commit 
time")) {
+              throw e;
+            }
+          }
+        }
       } catch (Exception e1) {
         throw new RuntimeException(e1);
       }
     });
-    String instant5 = HoodieActiveTimeline.createNewInstantTime();
+    AtomicReference<String> instant5Ref = new AtomicReference<>();
     Future future2 = executors.submit(() -> {
       try {
-        client2.scheduleTableService(instant5, Option.empty(), 
TableServiceType.COMPACT);
+        int counter = 0;
+        while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+          try {
+            String instant5 = HoodieActiveTimeline.createNewInstantTime();
+            client2.scheduleTableService(instant5, Option.empty(), 
TableServiceType.COMPACT);
+            instant5Ref.set(instant5);
+            break;
+          } catch (IllegalArgumentException e) {
+            if (!e.getMessage().toString().contains("Found later commit 
time")) {
+              throw e;
+            }
+          }
+        }
       } catch (Exception e2) {
         if (tableType == HoodieTableType.MERGE_ON_READ) {
           throw new RuntimeException(e2);
         }
       }
     });
-    String instant6 = HoodieActiveTimeline.createNewInstantTime();
+    AtomicReference<String> instant6Ref = new AtomicReference<>();
     Future future3 = executors.submit(() -> {
-      try {
-        client2.scheduleTableService(instant6, Option.empty(), 
TableServiceType.CLEAN);
-      } catch (Exception e2) {
-        throw new RuntimeException(e2);
+      int counter = 0;
+      while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+        try {
+          String instant6 = HoodieActiveTimeline.createNewInstantTime();
+          client2.scheduleTableService(instant6, Option.empty(), 
TableServiceType.CLEAN);
+          instant6Ref.set(instant6);
+          break;
+        } catch (IllegalArgumentException e) {
+          if (!e.getMessage().toString().contains("Found later commit time")) {
+            throw e;
+          }
+        } catch (Exception e2) {
+          throw new RuntimeException(e2);
+        }
       }
     });
     future1.get();
     future2.get();
     future3.get();
     // Create inserts, run cleaning, run compaction in parallel
-    String instant7 = HoodieActiveTimeline.createNewInstantTime();
     future1 = executors.submit(() -> {
       int numRecords = 100;
-      try {
-        createCommitWithInserts(cfg, client1, instantTime3, instant7, 
numRecords);
-        validInstants.add(instant7);
-      } catch (Exception e1) {
-        throw new RuntimeException(e1);
+      int counter = 0;
+      while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+        try {
+          String instant7 = HoodieActiveTimeline.createNewInstantTime();
+          createCommitWithInserts(cfg, client1, instantTime3, instant7, 
numRecords);
+          validInstants.add(instant7);
+          break;
+        } catch (IllegalArgumentException e) {
+          if (!e.getMessage().toString().contains("Found later commit time")) {
+            throw e;
+          }
+        } catch (Exception e1) {
+          throw new RuntimeException(e1);
+        }
       }
     });
     future2 = executors.submit(() -> {
-      try {
-        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client2.compact(instant5);
-        client2.commitCompaction(instant5, 
compactionMetadata.getCommitMetadata().get(), Option.empty());
-        validInstants.add(instant5);
-      } catch (Exception e2) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
+      int counter = 0;
+      while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+        try {
+          HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client2.compact(instant5Ref.get());
+          client2.commitCompaction(instant5Ref.get(), 
compactionMetadata.getCommitMetadata().get(), Option.empty());
+          validInstants.add(instant5Ref.get());
+          break;
+        } catch (IllegalArgumentException e) {
+          if (!e.getMessage().toString().contains("Found later commit time")) {
+            throw e;
+          }
+        } catch (Exception e2) {
+          if (tableType == HoodieTableType.MERGE_ON_READ) {
+            Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
+          }
         }
       }
     });
     future3 = executors.submit(() -> {
-      try {
-        client2.clean(instant6, false);
-        validInstants.add(instant6);
-      } catch (Exception e2) {
-        throw new RuntimeException(e2);
+      int counter = 0;
+      while (counter++ < 5) { // we can't really time which writer triggers 
first and which one triggers later. So, lets add few rounds of retries.
+        try {
+          client2.clean(instant6Ref.get(), false);
+          validInstants.add(instant6Ref.get());
+          break;
+        } catch (IllegalArgumentException e) {
+          if (!e.getMessage().toString().contains("Found later commit time")) {
+            throw e;
+          }
+        } catch (Exception e2) {
+          throw new RuntimeException(e2);
+        }
       }
     });
     future1.get();
@@ -195,6 +252,10 @@ public class TestMultiWriterWithPreferWriterIngestion 
extends HoodieClientTestBa
     Assertions.assertTrue(validInstants.containsAll(completedInstants));
   }
 
+  private String getNextCommitTime() {
+    return HoodieActiveTimeline.createNewInstantTime();
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", 
"MERGE_ON_READ"})
   public void testHoodieClientMultiWriterWithClustering(HoodieTableType 
tableType) throws Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 196ab9cb434..3af0d3af9d0 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2202,7 +2202,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     SparkRDDWriteClient client = getHoodieWriteClient(config);
 
     // Write 1 (Bulk insert)
-    String newCommitTime = "0000001";
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
     client.startCommitWithTime(newCommitTime);
     List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), newCommitTime).collect();
@@ -2210,7 +2210,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(client);
 
     // Write 2 (inserts)
-    newCommitTime = "0000002";
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     client.startCommitWithTime(newCommitTime);
     records = dataGen.generateInserts(newCommitTime, 20);
     writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
@@ -2240,7 +2240,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), 
file))));
 
     // trigger new write to mimic other writes succeeding before re-attempt.
-    newCommitTime = "0000003";
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     client.startCommitWithTime(newCommitTime);
     records = dataGen.generateInserts(newCommitTime, 20);
     writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index e9020034c07..624570fdd59 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -945,6 +945,34 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testOutOfOrderCommitTimestamps(boolean enableValidation) {
+    HoodieWriteConfig config = getConfigBuilder()
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withEnableTimestampOrderingValidation(enableValidation)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+      String commit1 = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(commit1);
+      String commit2 = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(commit2);
+      String commit3 = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(commit3);
+
+      // create commit4 after commit5. commit4 creation should fail when 
timestamp ordering validation is enabled.
+      String commit4 = HoodieActiveTimeline.createNewInstantTime();
+      String commit5 = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(commit5);
+      if (enableValidation) {
+        assertThrows(IllegalArgumentException.class, () -> 
client.startCommitWithTime(commit4));
+      } else {
+        client.startCommitWithTime(commit4);
+      }
+    }
+  }
+
   @Test
   public void testPendingRestore() throws IOException {
     HoodieWriteConfig config = getConfigBuilder().withMetadataConfig(
@@ -2437,25 +2465,29 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
 
     // perform 1 successful commit
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, true);
 
     // Perform 2 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit1,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
     // Perform 1 successful write
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, true);
     HoodieTableMetaClient metaClient = createMetaClient(basePath);
@@ -2467,12 +2499,13 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     // Await till enough time passes such that the first 2 failed commits 
heartbeats are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
       Thread.sleep(2000);
     }
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     // Perform 1 successful write
-    writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit5)), 
commit5,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, true);
     client.clean();
@@ -2515,32 +2548,36 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     // Perform 1 successful writes to table
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, true);
 
     // Perform 1 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit2,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
     // Toggle cleaning policy to LAZY
     cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
     // Perform 2 failed writes to table
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
     // Await till enough time passes such that the 2 failed commits heartbeats 
are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit4);
       Thread.sleep(2000);
     }
     client.clean();
@@ -2548,13 +2585,15 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     assertTrue(timeline.getTimelineOfActions(
         CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
     // Perform 2 failed commits
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
+    String commit6 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
-    writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
+    writeBatch(client, commit6, commit5, Option.of(Arrays.asList(commit4)), 
commit4,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
         0, false);
     client.close();
@@ -2577,27 +2616,31 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     HoodieTestUtils.init(storageConf, basePath);
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
     // perform 1 successful write
-    writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), 
commit1,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 100,
         0, true);
 
     // Perform 2 failed writes to table
-    writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), 
commit2,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 100,
         0, false);
     client.close();
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
-    writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+    writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), 
commit3,
         100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 100,
         0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
     // Create a successful commit
-    Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-        "400", "300", Option.of(Arrays.asList("400")), "300", 100, 
dataGen::generateInserts,
+    String commit4 = HoodieActiveTimeline.createNewInstantTime();
+    Future<JavaRDD<WriteStatus>> commit4Future = service.submit(() -> 
writeBatch(new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true)),
+        commit4, commit3, Option.of(Arrays.asList(commit4)), commit3, 100, 
dataGen::generateInserts,
         SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
-    commit3.get();
+    commit4Future.get();
     HoodieTableMetaClient metaClient = createMetaClient(basePath);
 
     assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
@@ -2608,14 +2651,15 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     // Await till enough time passes such that the first 2 failed commits 
heartbeats are expired
     boolean conditionMet = false;
     while (!conditionMet) {
-      conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+      conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
       Thread.sleep(2000);
     }
-    Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-        "500", "400", Option.of(Arrays.asList("500")), "500", 100, 
dataGen::generateInserts,
+    String commit5 = HoodieActiveTimeline.createNewInstantTime();
+    Future<JavaRDD<WriteStatus>> commit5Future = service.submit(() -> 
writeBatch(new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true)),
+        commit5, commit4, Option.of(Arrays.asList(commit5)), commit5, 100, 
dataGen::generateInserts,
         SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
     Future<HoodieCleanMetadata> clean1 = service.submit(() -> new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)).clean());
-    commit4.get();
+    commit5Future.get();
     clean1.get();
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
     assertTrue(timeline.getTimelineOfActions(
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index bce99444dcf..d94fdab3f25 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -154,7 +154,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
     assertNoWriteErrors(statuses.collect());
     // verify that there is a commit
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
+    HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitsTimeline();
     assertEquals(1, timeline.findInstantsAfter("000", 
Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
     // Should have 100 records in table (check using Index), all in locations 
marked at commit
     HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, 
metaClient);
@@ -1084,7 +1084,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
       testTable.addReplaceCommit("00000000000004", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
 
       // run cleaner with failures
-      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true, 
simulateMetadataFailure, 5, true);
+      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true, 
simulateMetadataFailure, 5, true, Option.empty());
       assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
       assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
       assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
@@ -1131,16 +1131,16 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1, file2P1));
         }
       });
-      commitWithMdt("10", part1ToFileId, testTable, metadataWriter);
-      testTable.addClean("15");
-      commitWithMdt("20", part1ToFileId, testTable, metadataWriter);
+      commitWithMdt(makeNewCommitTime(10, "%09d"), part1ToFileId, testTable, 
metadataWriter);
+      testTable.addClean(makeNewCommitTime(15, "%09d"));
+      commitWithMdt(makeNewCommitTime(20, "%09d"), part1ToFileId, testTable, 
metadataWriter);
 
       // add clean instant
       HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""),
           "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>(), Collections.emptyMap());
       HoodieCleanMetadata cleanMeta = new HoodieCleanMetadata("", 0L, 0,
-          "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), Collections.emptyMap());
-      testTable.addClean("30", cleanerPlan, cleanMeta);
+          makeNewCommitTime(20, "%09d"), "", new HashMap<>(), 
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), Collections.emptyMap());
+      testTable.addClean(makeNewCommitTime(30, "%09d"), cleanerPlan, 
cleanMeta);
 
       // add file in partition "part_2"
       String file3P2 = UUID.randomUUID().toString();
@@ -1150,8 +1150,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p2, CollectionUtils.createImmutableList(file3P2, file4P2));
         }
       });
-      commitWithMdt("30", part2ToFileId, testTable, metadataWriter);
-      commitWithMdt("40", part2ToFileId, testTable, metadataWriter);
+      commitWithMdt(makeNewCommitTime(30, "%09d"), part2ToFileId, testTable, 
metadataWriter);
+      commitWithMdt(makeNewCommitTime(40, "%09d"), part2ToFileId, testTable, 
metadataWriter);
 
       // empty commits
       String file5P2 = UUID.randomUUID().toString();
@@ -1161,25 +1161,25 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p2, CollectionUtils.createImmutableList(file5P2, file6P2));
         }
       });
-      commitWithMdt("50", part2ToFileId, testTable, metadataWriter);
-      commitWithMdt("60", part2ToFileId, testTable, metadataWriter);
+      commitWithMdt(makeNewCommitTime(50, "%09d"), part2ToFileId, testTable, 
metadataWriter);
+      commitWithMdt(makeNewCommitTime(60, "%09d"), part2ToFileId, testTable, 
metadataWriter);
 
       // archive commit 1, 2
       new HoodieTimelineArchiver<>(config, HoodieSparkTable.create(config, 
context, metaClient))
           .archiveIfRequired(context, false);
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      assertFalse(metaClient.getActiveTimeline().containsInstant("10"));
-      assertFalse(metaClient.getActiveTimeline().containsInstant("20"));
-
-      runCleaner(config);
-      assertFalse(testTable.baseFileExists(p1, "10", file1P1), "Clean old 
FileSlice in p1 by fallback to full clean");
-      assertFalse(testTable.baseFileExists(p1, "10", file2P1), "Clean old 
FileSlice in p1 by fallback to full clean");
-      assertFalse(testTable.baseFileExists(p2, "30", file3P2), "Clean old 
FileSlice in p2");
-      assertFalse(testTable.baseFileExists(p2, "30", file4P2), "Clean old 
FileSlice in p2");
-      assertTrue(testTable.baseFileExists(p1, "20", file1P1), "Latest 
FileSlice exists");
-      assertTrue(testTable.baseFileExists(p1, "20", file2P1), "Latest 
FileSlice exists");
-      assertTrue(testTable.baseFileExists(p2, "40", file3P2), "Latest 
FileSlice exists");
-      assertTrue(testTable.baseFileExists(p2, "40", file4P2), "Latest 
FileSlice exists");
+      
assertFalse(metaClient.getActiveTimeline().containsInstant(makeNewCommitTime(10,
 "%09d")));
+      
assertFalse(metaClient.getActiveTimeline().containsInstant(makeNewCommitTime(20,
 "%09d")));
+
+      runCleaner(config, false, false, 80, false, Option.empty());
+      assertFalse(testTable.baseFileExists(p1, makeNewCommitTime(10, "%09d"), 
file1P1), "Clean old FileSlice in p1 by fallback to full clean");
+      assertFalse(testTable.baseFileExists(p1, makeNewCommitTime(10, "%09d"), 
file2P1), "Clean old FileSlice in p1 by fallback to full clean");
+      assertFalse(testTable.baseFileExists(p2, makeNewCommitTime(30, "%09d"), 
file3P2), "Clean old FileSlice in p2");
+      assertFalse(testTable.baseFileExists(p2, makeNewCommitTime(30, "%09d"), 
file4P2), "Clean old FileSlice in p2");
+      assertTrue(testTable.baseFileExists(p1, makeNewCommitTime(20, "%09d"), 
file1P1), "Latest FileSlice exists");
+      assertTrue(testTable.baseFileExists(p1, makeNewCommitTime(20, "%09d"), 
file2P1), "Latest FileSlice exists");
+      assertTrue(testTable.baseFileExists(p2, makeNewCommitTime(40, "%09d"), 
file3P2), "Latest FileSlice exists");
+      assertTrue(testTable.baseFileExists(p2, makeNewCommitTime(40, "%09d"), 
file4P2), "Latest FileSlice exists");
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
index f0cc4c3c789..bd97c22c3cf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
@@ -141,7 +141,7 @@ public class TestCleanerInsertAndCleanByVersions extends 
SparkClientFunctionalTe
           ? wrapRecordsGenFunctionForPreppedCalls(basePath(), storageConf(), 
context(), cfg, dataGen::generateUniqueUpdates)
           : dataGen::generateUniqueUpdates;
 
-      HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+      HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
       insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, 
recordInsertGenWrappedFunction, insertFn);
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = 
new HashMap<>();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e78ed757e8f..99f83e2ad43 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -443,15 +443,12 @@ public class TestCopyOnWriteRollbackActionExecutor 
extends HoodieClientRollbackT
     SparkRDDWriteClient clusteringClient = getHoodieWriteClient(
         ClusteringTestUtils.getClusteringConfig(basePath, 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, properties));
 
-    // Save an older instant for us to run clustering.
-    String clusteringInstant1 = HoodieActiveTimeline.createNewInstantTime();
+    // Lets just trigger scheduling for a clustering instant.
+    String clusteringInstant1 = 
ClusteringTestUtils.runClustering(clusteringClient, false, false);
 
-    // Create completed clustering commit
+    // Create another clustering commit (and complete it)
     ClusteringTestUtils.runClustering(clusteringClient, false, true);
 
-    // Now execute clustering on the saved instant and do not allow it to 
commit.
-    ClusteringTestUtils.runClusteringOnInstant(clusteringClient, false, false, 
clusteringInstant1);
-
     HoodieTable table = this.getHoodieTable(metaClient, 
getConfigBuilder().withEmbeddedTimelineServerEnabled(false).build());
     HoodieInstant needRollBackInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, secondCommit);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
new file mode 100644
index 00000000000..8572f444f2e
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
+
+import java.util.List;
+
+public class MockCompactionStrategy extends LogFileSizeBasedCompactionStrategy 
{
+
+  @Override
+  public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig 
writeConfig,
+                                                     
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+    HoodieCompactionPlan compactionPlan = 
super.generateCompactionPlan(writeConfig, operations, pendingCompactionPlans);
+    try {
+      Thread.sleep(10000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    return compactionPlan;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index f9c2c82809e..96adcf69756 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -62,6 +63,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -139,7 +141,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsOne =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, 
true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, 
true, Option.empty());
       assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any 
partitions and clean any files");
       assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
       assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
@@ -158,7 +160,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsTwo =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, 
true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, 
true, Option.empty());
       assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any 
partitions and clean any files");
       assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
       assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
@@ -176,7 +178,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsThree =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, 
true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, 
true, Option.empty());
       assertEquals(0, hoodieCleanStatsThree.size(),
           "Must not clean any file. We have to keep 1 version before the 
latest commit time to keep");
       assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
@@ -192,7 +194,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsFour =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, 
true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, 
true, Option.empty());
       // enableBootstrapSourceClean would delete the bootstrap base file as 
the same time
       HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, 
p0);
 
@@ -222,7 +224,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsFive =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
10, true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
10, true, Option.empty());
 
       assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files 
since at least 2 commits are needed from last clean operation before "
           + "clean can be scheduled again");
@@ -243,7 +245,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
           Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
       List<HoodieCleanStat> hoodieCleanStatsFive2 =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
12, true);
+          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
12, true, Option.empty());
       HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
       assertNull(cleanStat, "Must not clean any files");
       assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
@@ -455,28 +457,28 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       HoodieTestTable testTable = HoodieTestTable.of(metaClient);
       String p0 = "2020/01/01";
       // Make 3 files, one base file and 2 log files associated with base file
-      String file1P0 = 
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+      String file1P0 = testTable.addDeltaCommit(makeNewCommitTime(0, 
"%09d")).getFileIdsWithBaseFilesInPartitions(p0).get(p0);
       Map<String, List<String>> part1ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
         {
           put(p0, CollectionUtils.createImmutableList(file1P0));
         }
       });
-      commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt(makeNewCommitTime(0, "%09d"), part1ToFileId, testTable, 
metadataWriter, true, true);
 
       // Make 2 files, one base file and 1 log files associated with base file
-      testTable.addDeltaCommit("001")
+      testTable.addDeltaCommit(makeNewCommitTime(1, "%09d"))
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 3);
-      commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt(makeNewCommitTime(1, "%09d"), part1ToFileId, testTable, 
metadataWriter, true, true);
 
-      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
+      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, 3, false);
       assertEquals(3,
           getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
               .size(), "Must clean three files, one base and 2 log files");
-      assertFalse(testTable.baseFileExists(p0, "000", file1P0));
-      assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
-      assertTrue(testTable.baseFileExists(p0, "001", file1P0));
-      assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
+      assertFalse(testTable.baseFileExists(p0, makeNewCommitTime(0, "%09d"), 
file1P0));
+      assertFalse(testTable.logFilesExist(p0, makeNewCommitTime(0, "%09d"), 
file1P0, 1, 2));
+      assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(1, "%09d"), 
file1P0));
+      assertTrue(testTable.logFileExists(p0, makeNewCommitTime(1, "%09d"), 
file1P0, 3));
     }
   }
 
@@ -506,30 +508,30 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p0, CollectionUtils.createImmutableList(file1P0));
         }
       });
-      commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt(makeNewCommitTime(0, "%09d"), part1ToFileId, testTable, 
metadataWriter, true, true);
 
       // Make 2 files, one base file and 1 log files associated with base file
-      testTable.addDeltaCommit("001")
+      testTable.addDeltaCommit(makeNewCommitTime(1, "%09d"))
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 3);
-      commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt(makeNewCommitTime(1, "%09d"), part1ToFileId, testTable, 
metadataWriter, true, true);
 
       // Make 2 files, one base file and 1 log files associated with base file
-      testTable.addDeltaCommit("002")
+      testTable.addDeltaCommit(makeNewCommitTime(2, "%09d"))
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 4);
-      commitWithMdt("002", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt(makeNewCommitTime(2, "%09d"), part1ToFileId, testTable, 
metadataWriter, true, true);
 
-      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
+      List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, false, 
false, 3, false, Option.empty());
       assertEquals(3,
           getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
               .size(), "Must clean three files, one base and 2 log files");
-      assertFalse(testTable.baseFileExists(p0, "000", file1P0));
-      assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
-      assertTrue(testTable.baseFileExists(p0, "001", file1P0));
-      assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
-      assertTrue(testTable.baseFileExists(p0, "002", file1P0));
-      assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
+      assertFalse(testTable.baseFileExists(p0, makeNewCommitTime(0, "%09d"),  
file1P0));
+      assertFalse(testTable.logFilesExist(p0, makeNewCommitTime(0, "%09d"), 
file1P0, 1, 2));
+      assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(1, "%09d"), 
file1P0));
+      assertTrue(testTable.logFileExists(p0, makeNewCommitTime(1, "%09d"), 
file1P0, 3));
+      assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(2, "%09d"), 
file1P0));
+      assertTrue(testTable.logFileExists(p0, makeNewCommitTime(2, "%09d"), 
file1P0, 4));
     }
   }
 
@@ -600,7 +602,7 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       testTable.addDeletePartitionCommit(deleteInstant1, p1, 
Arrays.asList(file1P1, file2P1));
       testTable.addDeletePartitionCommit(deleteInstant2, p2, 
Arrays.asList(file1P2, file2P2));
 
-      runCleaner(config);
+      runCleaner(config, Option.of((Functions.Function0<String>) () -> 
HoodieActiveTimeline.createNewInstantTime()));
 
       assertFalse(testTable.baseFileExists(p1, commitInstant, file1P1), "p1 
cleaned");
       assertFalse(testTable.baseFileExists(p1, commitInstant, file2P1), "p1 
cleaned");
@@ -693,7 +695,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       commitWithMdt(thirdCommitTs, part3ToFileId, testTable, metadataWriter, 
true, true);
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
-      List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry, simulateMetadataFailure);
+      List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry, simulateMetadataFailure, 0,
+          false, Option.of((Functions.Function0) () -> 
HoodieActiveTimeline.createNewInstantTime()));
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       assertEquals(2, hoodieCleanStatsThree.size(), "Should clean one file 
each from both the partitions");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 85a9c6759c2..d05e6b5548b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -47,12 +47,15 @@ import org.apache.hudi.config.HoodieLayoutConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
 import org.apache.hudi.table.action.rollback.RollbackUtils;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -61,6 +64,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -72,6 +76,13 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -81,6 +92,7 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("functional")
 public class TestHoodieSparkMergeOnReadTableCompaction extends 
SparkClientFunctionalTestHarness {
@@ -163,6 +175,100 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     assertEquals(300, readTableTotalRecordsNum());
   }
 
+  @Test
+  public void testOutOfOrderCompactionSchedules() throws IOException, 
ExecutionException, InterruptedException {
+    HoodieWriteConfig config = getWriteConfigWithMockCompactionStrategy();
+    HoodieWriteConfig config2 = getWriteConfig();
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new 
Properties());
+    client = getHoodieWriteClient(config);
+
+    // write data and commit
+    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+
+    // 2nd batch
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    JavaRDD records = jsc().parallelize(dataGen.generateUniqueUpdates(commit2, 
50), 2);
+    client.startCommitWithTime(commit2);
+    List<WriteStatus> writeStatuses = client.upsert(records, 
commit2).collect();
+    List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    client.commitStats(commit2, context().parallelize(writeStatuses, 1), 
writeStats, Option.empty(), metaClient.getCommitActionType());
+
+    // schedule compaction
+    String compactionInstant1 = HoodieActiveTimeline.createNewInstantTime();
+    String compactionInstant2 = HoodieActiveTimeline.createNewInstantTime(10 * 
1000);
+    String commit3 = HoodieActiveTimeline.createNewInstantTime(15 * 1000);
+
+    final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+    final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+    final ExecutorService executors = Executors.newFixedThreadPool(2);
+    try {
+      final SparkRDDWriteClient client2 = getHoodieWriteClient(config2);
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      Future future1 = executors.submit(() -> {
+        try {
+          if (countDownLatch.await(20, TimeUnit.SECONDS)) {
+            List<WriteStatus> writeStatuses1 = client.upsert(records, 
commit3).collect();
+            List<HoodieWriteStat> writeStats1 = 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+            client.commitStats(commit3, context().parallelize(writeStatuses1, 
1), writeStats1, Option.empty(), metaClient.getCommitActionType());
+
+            client.scheduleCompactionAtInstant(compactionInstant1, 
Option.empty());
+            // since compactionInstant1 is earlier than compactionInstant2, 
and compaction strategy sleeps for 10s, this is expected to throw.
+            fail("Should not have reached here");
+          } else {
+            fail("Should not have reached here");
+          }
+        } catch (Exception e) {
+          writer1Completed.set(true);
+        }
+      });
+
+      Future future2 = executors.submit(() -> {
+        try {
+          assertTrue(client2.scheduleCompactionAtInstant(compactionInstant2, 
Option.empty()));
+          writer2Completed.set(true);
+          countDownLatch.countDown();
+        } catch (Exception e) {
+          throw new HoodieException("Should not have reached here");
+        }
+      });
+
+      future2.get();
+      future1.get();
+
+      // both should have been completed successfully. I mean, we already 
assert for conflict for writer2 at L155.
+      assertTrue(writer1Completed.get() && writer2Completed.get());
+      client.close();
+      client2.close();
+    } finally {
+      executors.shutdownNow();
+    }
+  }
+
+  private HoodieWriteConfig getWriteConfigWithMockCompactionStrategy() {
+    MockCompactionStrategy compactionStrategy = new MockCompactionStrategy();
+    return getWriteConfig(compactionStrategy);
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return getWriteConfig(new LogFileSizeBasedCompactionStrategy());
+  }
+
+  private HoodieWriteConfig getWriteConfig(CompactionStrategy 
compactionStrategy) {
+    return HoodieWriteConfig.newBuilder()
+        .forTable("test-trip-table")
+        .withPath(basePath())
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withAutoCommit(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withCompactionStrategy(compactionStrategy)
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withEnableTimestampOrderingValidation(true)
+        .build();
+  }
+
   @ParameterizedTest
   @MethodSource("writeLogTest")
   public void testWriteLogDuringCompaction(boolean enableMetadataTable, 
boolean enableTimelineServer) throws IOException {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index ceeae9d107f..9bb44985b81 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
@@ -71,30 +72,34 @@ public class HoodieCleanerTestBase extends 
HoodieClientTestBase {
     return metadata;
   }
 
+  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws 
IOException {
+    return runCleaner(config, Option.empty());
+  }
+
   /**
    * Helper to run cleaner and collect Clean Stats.
    *
    * @param config HoodieWriteConfig
    */
-  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws 
IOException {
-    return runCleaner(config, false, false, 1, false);
+  protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, 
Option<Functions.Function0<String>> commitTimeGenerationFunc) throws 
IOException {
+    return runCleaner(config, false, false, 1, false, 
commitTimeGenerationFunc);
   }
 
   protected List<HoodieCleanStat> 
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean 
needInstantInHudiFormat) throws IOException {
-    return runCleaner(config, false, false, 1, needInstantInHudiFormat);
+    return runCleaner(config, false, false, 1, needInstantInHudiFormat, 
Option.empty());
   }
 
   protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int 
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
-    return runCleaner(config, false, false, firstCommitSequence, 
needInstantInHudiFormat);
+    return runCleaner(config, false, false, firstCommitSequence, 
needInstantInHudiFormat, Option.empty());
   }
 
   protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean 
simulateRetryFailure) throws IOException {
-    return runCleaner(config, simulateRetryFailure, false, 1, false);
+    return runCleaner(config, simulateRetryFailure, false, 1, false, 
Option.empty());
   }
 
   protected List<HoodieCleanStat> runCleaner(
       HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure) throws IOException {
-    return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 
1, false);
+    return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 
1, false, Option.empty());
   }
 
   /**
@@ -104,9 +109,10 @@ public class HoodieCleanerTestBase extends 
HoodieClientTestBase {
    */
   protected List<HoodieCleanStat> runCleaner(
       HoodieWriteConfig config, boolean simulateRetryFailure, boolean 
simulateMetadataFailure,
-      Integer firstCommitSequence, boolean needInstantInHudiFormat) throws 
IOException {
+      Integer firstCommitSequence, boolean needInstantInHudiFormat, 
Option<Functions.Function0<String>> commitTimeGenerationFunc) throws 
IOException {
     SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
-    String cleanInstantTs = needInstantInHudiFormat ? 
makeNewCommitTime(firstCommitSequence, "%014d") : 
makeNewCommitTime(firstCommitSequence, "%09d");
+    String cleanInstantTs = commitTimeGenerationFunc.isPresent() ? 
commitTimeGenerationFunc.get().apply() :
+        (needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, 
"%014d") : makeNewCommitTime(firstCommitSequence, "%09d"));
     HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
 
     if (null == cleanMetadata1) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 96853950d50..00fcbfbe044 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -28,11 +28,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JavaConversions
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.{DataFrame, _}
 import org.apache.spark.sql.functions.{col, not}
@@ -41,7 +41,6 @@ import org.junit.jupiter.api._
 
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.stream.Collectors
-
 import scala.collection.JavaConverters._
 import scala.collection.{JavaConverters, mutable}
 
@@ -178,11 +177,26 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
     }
     val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
     latestBatchDf.cache()
-    latestBatchDf.write.format("org.apache.hudi")
-      .options(hudiOpts)
-      .option(DataSourceWriteOptions.OPERATION.key, operation)
-      .mode(saveMode)
-      .save(basePath)
+    var counter = 0
+    var succeeded = false
+    while (!succeeded && counter < 5) {
+      try {
+        latestBatchDf.write.format("org.apache.hudi")
+          .options(hudiOpts)
+          .option(DataSourceWriteOptions.OPERATION.key, operation)
+          .mode(saveMode)
+          .save(basePath)
+        succeeded = true;
+      } catch {
+        case e: IllegalArgumentException => {  // with HUDI-7507, we might get 
illegal argument exception with multi-writers.
+          // Hence adding retries.
+          if (!e.getMessage.toString.contains("Found later commit time")) {
+            throw new HoodieException("Unexpected Exception thrown ", e)
+          }
+          counter += 1
+        }
+      }
+    }
     val deletedDf = calculateMergedDf(latestBatchDf, operation)
     deletedDf.cache()
     if (validate) {
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index 8e87755c294..171dd8cb6bd 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.internal;
 
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -74,7 +75,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     // execute N rounds
     for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
           STRUCT_TYPE, populateMetaFields, sorted);
@@ -116,7 +117,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     for (int i = 0; i < 1; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
           STRUCT_TYPE, populateMetaFields, sorted);
@@ -161,7 +162,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
 
-    String instantTime = "001";
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
     HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
         STRUCT_TYPE, true, false);
 
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 61ceaebaee6..8c786068943 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.internal;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -75,7 +76,7 @@ public class TestHoodieDataSourceInternalWriter extends
       throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    String instantTime = "001";
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
     // init writer
     HoodieDataSourceInternalWriter dataSourceInternalWriter =
         new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, new DataSourceOptions(extraMetadata), 
populateMetaFields, false);
@@ -163,7 +164,7 @@ public class TestHoodieDataSourceInternalWriter extends
 
     // execute N rounds
     for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieDataSourceInternalWriter dataSourceInternalWriter =
           new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, new 
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -210,7 +211,7 @@ public class TestHoodieDataSourceInternalWriter extends
 
     // execute N rounds
     for (int i = 0; i < 3; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieDataSourceInternalWriter dataSourceInternalWriter =
           new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, new 
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -258,7 +259,7 @@ public class TestHoodieDataSourceInternalWriter extends
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
 
-    String instantTime0 = "00" + 0;
+    String instantTime0 = HoodieActiveTimeline.createNewInstantTime();
     // init writer
     HoodieDataSourceInternalWriter dataSourceInternalWriter =
         new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, new 
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -298,7 +299,7 @@ public class TestHoodieDataSourceInternalWriter extends
     assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
 
     // 2nd batch. abort in the end
-    String instantTime1 = "00" + 1;
+    String instantTime1 = HoodieActiveTimeline.createNewInstantTime();
     dataSourceInternalWriter =
         new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf,
             new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, 
false);
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
index 206d4931b15..1050173ff11 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.spark3.internal;
 
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -77,7 +78,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     // execute N rounds
     for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000),
           RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted);
@@ -123,7 +124,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
 
-    String instantTime = "001";
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
     HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000),
         RANDOM.nextLong(), STRUCT_TYPE, true, false);
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
index 64042f2ebbb..0db60439075 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -21,6 +21,7 @@ package org.apache.hudi.spark3.internal;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -77,8 +78,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
   private void testDataSourceWriterInternal(Map<String, String> extraMetadata, 
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws 
Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    String instantTime = "001";
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
     // init writer
     HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
         new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields, 
false);
@@ -167,7 +167,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
 
     // execute N rounds
     for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
           new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
@@ -214,7 +214,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
 
     // execute N rounds
     for (int i = 0; i < 3; i++) {
-      String instantTime = "00" + i;
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
       // init writer
       HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
           new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 41ab16d7bfd..881b6599f77 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -403,7 +403,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   private HoodieWriteConfig getWriteConfig() {
     return getConfigBuilder(basePath(), metaClient)
         
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
-        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(5).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .build();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 319aa8540a4..696bea22334 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -105,22 +105,28 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .enable(false).build())
         .build();
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    String commit2 = HoodieActiveTimeline.createNewInstantTime(10);
+    String commit3 = HoodieActiveTimeline.createNewInstantTime(100);
+    String commit4 = HoodieActiveTimeline.createNewInstantTime(200);
+    String commit5 = HoodieActiveTimeline.createNewInstantTime(300);
+    String commit6 = HoodieActiveTimeline.createNewInstantTime(400);
 
     try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
-      Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, 
INSERT, null, "100");
-      Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, 
INSERT, null, "200");
-      Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, 
INSERT, null, "300");
-      Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, 
INSERT, null, "400");
-      Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, 
INSERT, null, "500");
+      Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, 
INSERT, null, commit1);
+      Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, 
INSERT, null, commit2);
+      Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, 
INSERT, null, commit3);
+      Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, 
INSERT, null, commit4);
+      Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, 
INSERT, null,  commit5);
 
       // read everything upto latest
       
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.empty(), 500, inserts5.getKey());
 
-      // even if the begin timestamp is archived (100), full table scan should 
kick in, but should filter for records having commit time > 100
-      
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.of("100"), 400, inserts5.getKey());
+      // even if the begin timestamp is archived (commit1), full table scan 
should kick in, but should filter for records having commit time > 100
+      
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.of(commit1), 400, inserts5.getKey());
 
       // even if the read upto latest is set, if begin timestamp is in active 
timeline, only incremental should kick in.
-      
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.of("400"), 100, inserts5.getKey());
+      
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.of(commit4), 100, inserts5.getKey());
 
       // read just the latest
       readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 
Option.empty(), 100, inserts5.getKey());
@@ -128,7 +134,7 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
       // ensure checkpoint does not move
       readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 
Option.of(inserts5.getKey()), 0, inserts5.getKey());
 
-      Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, 
INSERT, null, "600");
+      Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, 
INSERT, null, commit6);
 
       // insert new batch and ensure the checkpoint moves
       readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 
Option.of(inserts5.getKey()), 100, inserts6.getKey());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 2a011cd9812..dd1d15940da 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -220,7 +220,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   private HoodieWriteConfig getWriteConfig() {
     return getConfigBuilder(basePath(), metaClient)
         
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
-        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(5).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .build();

Reply via email to