This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7943e1e18849 [HUDI-9188] Handle out of order deletes in Record Level 
Index (#13830)
7943e1e18849 is described below

commit 7943e1e18849af9ae826675738de220005b876a9
Author: Tim Brown <[email protected]>
AuthorDate: Fri Sep 5 14:28:08 2025 -0400

    [HUDI-9188] Handle out of order deletes in Record Level Index (#13830)
    
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  68 +++++++++----
 .../hudi/index/simple/TestGlobalSimpleIndex.java   |   1 +
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   1 +
 .../table/read/TestHoodieFileGroupReaderBase.java  |  14 +--
 .../common/testutils/HoodieTestDataGenerator.java  | 112 ++++++++++++++-------
 .../source/TestStreamReadMonitoringFunction.java   |   2 +-
 .../table/TestHoodieFileGroupReaderOnFlink.java    |   5 +-
 .../hudi/functional/RecordLevelIndexTestBase.scala |   9 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   5 +-
 .../apache/hudi/functional/TestMORDataSource.scala |   5 +-
 .../TestPartitionedRecordLevelIndex.scala          |  56 +++++++++--
 .../hudi/functional/TestRecordLevelIndex.scala     |  68 ++++++++++---
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |   4 +-
 .../TestHoodieMultiTableDeltaStreamer.java         |  11 +-
 .../testutils/sources/AbstractBaseTestSource.java  |   8 +-
 15 files changed, 260 insertions(+), 109 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index c90bbcb5172b..2ffe211a113a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.index;
 
 import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
@@ -38,7 +39,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -505,10 +506,13 @@ public class HoodieIndexUtils {
   /**
    * Merge tagged incoming records with existing records in case of partition 
path updated.
    */
-  public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
-      HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations,
-      HoodieWriteConfig config,
-      HoodieTable hoodieTable) {
+  public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesAndDeletionsIfNeeded(HoodieData<Pair<HoodieRecord<R>,
+                                                                               
              Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations,
+                                                                               
              boolean shouldUpdatePartitionPath,
+                                                                               
              HoodieWriteConfig config,
+                                                                               
              HoodieTable hoodieTable,
+                                                                               
              HoodieReaderContext<R> readerContext,
+                                                                               
              SerializableSchema writerSchema) {
     boolean isExpressionPayload = 
config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
     Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt =
         getKeygenAndUpdatedWriteConfig(config, 
hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
@@ -527,19 +531,14 @@ public class HoodieIndexUtils {
         .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
     // define the buffered record merger.
-    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
-        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
-    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
     TypedProperties properties = 
readerContext.getMergeProps(updatedConfig.getProps());
     RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
-    readerContext.initRecordMergerForIngestion(config.getProps());
     // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
     // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
     ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
         .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
     RecordContext<R> existingRecordContext = 
readerContextFactoryForExistingRecords.getContext().getRecordContext();
     // merged existing records with current locations being set
-    SerializableSchema writerSchema = new 
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
     SerializableSchema writerSchemaWithMetaFields = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(), 
updatedConfig.allowOperationMetadataField()));
     AvroSchemaCache.intern(writerSchema.get());
     AvroSchemaCache.intern(writerSchemaWithMetaFields.get());
@@ -570,10 +569,7 @@ public class HoodieIndexUtils {
           }
           HoodieRecord<R> existing = existingOpt.get();
           Schema writeSchema = writerSchema.get();
-          if (incoming.isDelete(writeSchema, properties)) {
-            // incoming is a delete: force tag the incoming to the old 
partition
-            return 
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), 
existing.getCurrentLocation())).iterator();
-          }
+
           Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(
               incoming, existing, writeSchema, 
writerSchemaWithMetaFields.get(), updatedConfig,
               recordMerger, keyGenerator, incomingRecordContext, 
existingRecordContext, orderingFieldsArray, properties, isExpressionPayload);
@@ -589,16 +585,34 @@ public class HoodieIndexUtils {
           if (Objects.equals(merged.getPartitionPath(), 
existing.getPartitionPath())) {
             // merged record has the same partition: route the merged result 
to the current location as an update
             return Collections.singletonList(tagRecord(merged, 
existing.getCurrentLocation())).iterator();
-          } else {
+          } else if (shouldUpdatePartitionPath) {
             // merged record has a different partition: issue a delete to the 
old partition and insert the merged record to the new partition
             HoodieRecord<R> deleteRecord = createDeleteRecord(updatedConfig, 
existing.getKey());
             deleteRecord.setIgnoreIndexUpdate(true);
             return Arrays.asList(tagRecord(deleteRecord, 
existing.getCurrentLocation()), merged).iterator();
+          } else {
+            // merged record has a different partition path but 
shouldUpdatePartitionPath is false, treat as an update to the existing location
+            return 
Collections.singletonList(merged.newInstance(existing.getKey())).iterator();
           }
         });
     return taggedUpdatingRecords.union(taggedNewRecords);
   }
 
+  /**
+   * Tags the incoming records with their existing locations if any is found.
+   * Global indexing can support moving records across partitions. To do so, 
the tagging logic will result in a delete to the old partition and an insert 
into the new partition. When this happens
+   * with Merge-on-Read tables, the record key is present in multiple 
locations until the file slices are compacted. If the index relies on reading 
the keys directly from the file slices, then
+   * `mayContainDuplicateLookup` must be set to true to account for this. If 
the lookup will only contain the latest location of the record key, then 
`mayContainDuplicateLookup` is set to false.
+   * @param incomingRecords The new data being written to the table
+   * @param keyAndExistingLocations The existing locations of the records in 
the table
+   * @param mayContainDuplicateLookup Set to true if the existing locations 
may contain multiple entries for the same record key
+   * @param shouldUpdatePartitionPath When true, the index will handle 
partition path updates by merging the incoming record with the existing record 
and determining if the partition path has changed.
+   *                                  When false, the incoming record will 
always be tagged to the existing location's partition path.
+   * @param config the writer's configuration
+   * @param table the table that is being updated
+   * @return the incoming records tagged with their existing locations if any 
is found
+   * @param <R> the type of the data in the record
+   */
   public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
       HoodieData<HoodieRecord<R>> incomingRecords,
       HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
@@ -606,11 +620,20 @@ public class HoodieIndexUtils {
       boolean shouldUpdatePartitionPath,
       HoodieWriteConfig config,
       HoodieTable table) {
-    final HoodieRecordMerger merger = config.getRecordMerger();
-
     HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
         incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
 
+    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
table.getContext()
+        .getReaderContextFactoryForWrite(table.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+    readerContext.initRecordMergerForIngestion(config.getProps());
+    TypedProperties properties = 
readerContext.getMergeProps(config.getProps());
+    SerializableSchema writerSchema = new 
SerializableSchema(config.getWriteSchema());
+    boolean isCommitTimeOrdered = readerContext.getMergeMode() == 
RecordMergeMode.COMMIT_TIME_ORDERING;
+    // if the index is not updating the partition of the record, and the table 
is COW, then we do not need to do merging at
+    // this phase since the writer path will merge when rewriting the files as 
part of the upsert operation.
+    boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath 
|| table.getMetaClient().getTableConfig().getTableType() == 
HoodieTableType.MERGE_ON_READ;
+
     // Pair of incoming record and the global location if meant for merged 
lookup in later stage
     HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
         = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
@@ -619,9 +642,10 @@ public class HoodieIndexUtils {
           Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
           if (currentLocOpt.isPresent()) {
             HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
-            boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup
-                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
-            if (shouldUpdatePartitionPath && shouldDoMergedLookUpThenTag) {
+            boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup // 
handle event time ordering updates
+                || shouldUpdatePartitionPath && 
!Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath())  // handle partition updates
+                || (!isCommitTimeOrdered && 
incomingRecord.isDelete(writerSchema.get(), properties)); // handle event time 
ordering deletes
+            if (requiresMergingWithOlderRecordVersion && 
shouldDoMergedLookUpThenTag) {
               // the pair's right side is a non-empty Option, which indicates 
that a merged lookup will be performed
               // at a later stage.
               return Pair.of(incomingRecord, currentLocOpt);
@@ -637,8 +661,8 @@ public class HoodieIndexUtils {
             return Pair.of(incomingRecord, Option.empty());
           }
         });
-    return shouldUpdatePartitionPath
-        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, 
config, table)
+    return requiresMergingWithOlderRecordVersion
+        ? 
mergeForPartitionUpdatesAndDeletionsIfNeeded(incomingRecordsAndLocations, 
shouldUpdatePartitionPath, config, table, readerContext, writerSchema)
         : incomingRecordsAndLocations.map(Pair::getLeft);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
index 5eeb9575ef81..f27cdd7c8ccb 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
@@ -131,6 +131,7 @@ class TestGlobalSimpleIndex extends HoodieCommonTestHarness 
{
     props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
     return HoodieWriteConfig.newBuilder()
         .withPath(basePath)
+        .withSchema(SCHEMA.toString())
         .withIndexConfig(HoodieIndexConfig.newBuilder()
             .fromProperties(props)
             .withIndexType(HoodieIndex.IndexType.GLOBAL_SIMPLE)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index aa7ce1e4bef5..4a0217e8e8ea 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -217,6 +217,7 @@ public class TestHoodieGlobalBloomIndex extends 
TestHoodieMetadataBase {
   @Test
   public void testTagLocation() throws Exception {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withSchema(SCHEMA.toString())
         .withIndexConfig(HoodieIndexConfig.newBuilder()
             .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
             .withGlobalBloomIndexUpdatePartitionPath(false)
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a91de164f21c..e03b9b5285f1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -214,6 +214,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // Initial commit. rider column gets value of rider-002
       List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+      long initialTs = (long) ((GenericRecord) 
initialRecords.get(0).getData()).get("timestamp");
       commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), true, 0, recordMergeMode,
@@ -222,7 +223,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // The updates have rider values as rider-001 and the existing records 
have rider values as rider-002
       // timestamp is 0 for all records so will not be considered
       // All updates in this batch will be ignored as rider values are smaller 
and timestamp value is same
-      List<HoodieRecord> updates = dataGen.generateUniqueUpdates("001", 5);
+      List<HoodieRecord> updates = dataGen.generateUniqueUpdates("001", 5, 
initialTs);
       List<HoodieRecord> allRecords = initialRecords;
       List<HoodieRecord> unmergedRecords = CollectionUtils.combine(updates, 
allRecords);
       commitToTable(updates, UPSERT.value(), false, writeConfigs);
@@ -233,7 +234,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // The updates have rider values as rider-003 and the existing records 
have rider values as rider-002
       // timestamp is 0 for all records so will not be considered
       // All updates in this batch will reflect in the final records
-      List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 10);
+      List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 10, 
initialTs);
       List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
       commitToTable(updates2, UPSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
@@ -579,7 +580,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType 
diskMapType) throws Exception {
     Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING, true));
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
-      commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), true, 
writeConfigs);
+      List<HoodieRecord> recordList = dataGen.generateInserts("001", 100);
+      long timestamp = (long) ((GenericRecord) 
recordList.get(0).getData()).get("timestamp");
+      commitToTable(recordList, INSERT.value(), true, writeConfigs);
       String baseMapPath = Files.createTempDirectory(null).toString();
       HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
       Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -616,8 +619,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
             assertEquals(keyBased.getRecordKey(), recordKey);
             assertEquals(positionBased.getRecordKey(), recordKey);
             assertEquals(avroSchema, 
readerContext.getRecordContext().getSchemaFromBufferRecord(keyBased));
-            // generate field value is hardcoded as 0 for ordering field: 
timestamp, see HoodieTestDataGenerator#generateRandomValue
-            
assertEquals(readerContext.getRecordContext().convertValueToEngineType(0L), 
positionBased.getOrderingValue());
+            
assertEquals(readerContext.getRecordContext().convertValueToEngineType(timestamp),
 positionBased.getOrderingValue());
           }
         }
       }
@@ -628,7 +630,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     Map<String, String> configMapping = new HashMap<>();
     configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
KEY_FIELD_NAME);
     configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
PARTITION_FIELD_NAME);
-    configMapping.put(HoodieTableConfig.ORDERING_FIELDS.key(), 
ORDERING_FIELD_NAME);
+    configMapping.put(HoodieTableConfig.ORDERING_FIELDS.key(), recordMergeMode 
!= RecordMergeMode.COMMIT_TIME_ORDERING ? ORDERING_FIELD_NAME : "");
     configMapping.put("hoodie.payload.ordering.field", ORDERING_FIELD_NAME);
     configMapping.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
     configMapping.put("hoodie.insert.shuffle.parallelism", "4");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 838acad02ecd..be13209c9165 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -314,19 +314,19 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
   }
 
-  public IndexedRecord generateRandomValueAsPerSchema(String schemaStr, 
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+  public IndexedRecord generateRandomValueAsPerSchema(String schemaStr, 
HoodieKey key, String commitTime, boolean isFlattened, long timestamp) throws 
IOException {
     if (TRIP_FLATTENED_SCHEMA.equals(schemaStr)) {
-      return generateRandomValue(key, commitTime, true);
+      return generateRandomValue(key, commitTime, true, timestamp);
     } else if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
-      return generateRandomValue(key, commitTime, isFlattened);
+      return generateRandomValue(key, commitTime, isFlattened, timestamp);
     } else if (TRIP_ENCODED_DECIMAL_SCHEMA.equals(schemaStr)) {
-      return generatePayloadForTripEncodedDecimalSchema(key, commitTime);
+      return generatePayloadForTripEncodedDecimalSchema(key, commitTime, 
timestamp);
     } else if (TRIP_SCHEMA.equals(schemaStr)) {
-      return generatePayloadForTripSchema(key, commitTime);
+      return generatePayloadForTripSchema(key, commitTime, timestamp);
     } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
-      return generatePayloadForShortTripSchema(key, commitTime);
+      return generatePayloadForShortTripSchema(key, commitTime, timestamp);
     } else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) {
-      return generateNestedExampleRandomValue(key, commitTime);
+      return generateNestedExampleRandomValue(key, commitTime, timestamp);
     } else if 
(TRIP_EXAMPLE_SCHEMA_WITH_PAYLOAD_SPECIFIC_COLS.equals(schemaStr)) {
       return generateRandomValueWithColumnRequired(key, commitTime);
     }
@@ -357,11 +357,11 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
    * @throws IOException
    */
   private IndexedRecord generateRandomValue(HoodieKey key, String instantTime, 
boolean isFlattened) {
-    return generateRandomValue(key, instantTime, isFlattened, 0);
+    return generateRandomValue(key, instantTime, isFlattened, 
System.currentTimeMillis());
   }
 
   private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String 
instantTime) {
-    return generateNestedExampleRandomValue(key, instantTime, 0);
+    return generateNestedExampleRandomValue(key, instantTime, 
System.currentTimeMillis());
   }
 
   private IndexedRecord generateRandomValue(HoodieKey key, String instantTime, 
boolean isFlattened, long timestamp) {
@@ -370,7 +370,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
         false, isFlattened);
   }
 
-  private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String 
instantTime, int ts) {
+  private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String 
instantTime, long ts) {
     return generateNestedExampleGenericRecord(
         key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, 
"driver-" + instantTime, ts,
         false);
@@ -379,34 +379,34 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   /**
    * Generates a new avro record with TRIP_ENCODED_DECIMAL_SCHEMA, retaining 
the key if optionally provided.
    */
-  public IndexedRecord generatePayloadForTripEncodedDecimalSchema(HoodieKey 
key, String commitTime) {
-    return generateRecordForTripEncodedDecimalSchema(key.getRecordKey(), 
"rider-" + commitTime, "driver-" + commitTime, 0);
+  public IndexedRecord generatePayloadForTripEncodedDecimalSchema(HoodieKey 
key, String commitTime, long timestamp) {
+    return generateRecordForTripEncodedDecimalSchema(key.getRecordKey(), 
"rider-" + commitTime, "driver-" + commitTime, timestamp);
   }
 
   /**
    * Generates a new avro record with TRIP_SCHEMA, retaining the key if 
optionally provided.
    */
-  public IndexedRecord generatePayloadForTripSchema(HoodieKey key, String 
commitTime) {
-    return generateRecordForTripSchema(key.getRecordKey(), "rider-" + 
commitTime, "driver-" + commitTime, 0);
+  public IndexedRecord generatePayloadForTripSchema(HoodieKey key, String 
commitTime, long timestamp) {
+    return generateRecordForTripSchema(key.getRecordKey(), "rider-" + 
commitTime, "driver-" + commitTime, timestamp);
   }
 
-  public IndexedRecord generatePayloadForShortTripSchema(HoodieKey key, String 
commitTime) {
-    return generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + 
commitTime, "driver-" + commitTime, 0);
+  public IndexedRecord generatePayloadForShortTripSchema(HoodieKey key, String 
commitTime, long timestamp) {
+    return generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + 
commitTime, "driver-" + commitTime, timestamp);
   }
 
   /**
    * Generates a new avro record of the above schema format for a delete.
    */
-  private IndexedRecord generateRandomDeleteValue(HoodieKey key, String 
instantTime) throws IOException {
-    return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), 
"rider-" + instantTime, "driver-" + instantTime, 0,
+  private IndexedRecord generateRandomDeleteValue(HoodieKey key, String 
instantTime, long timestamp) throws IOException {
+    return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), 
"rider-" + instantTime, "driver-" + instantTime, timestamp,
         true, false);
   }
 
   /**
    * Generates a new avro record of the above schema format, retaining the key 
if optionally provided.
    */
-  private IndexedRecord generateAvroPayload(HoodieKey key, String instantTime) 
{
-    return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), 
"rider-" + instantTime, "driver-" + instantTime, 0);
+  private IndexedRecord generateAvroPayload(HoodieKey key, String instantTime, 
long timestamp) {
+    return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), 
"rider-" + instantTime, "driver-" + instantTime, timestamp);
   }
 
   public GenericRecord generateGenericRecord(String rowKey, String 
partitionPath, String riderName, String driverName,
@@ -785,7 +785,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
   }
 
   public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime, 
Integer n, String schemaStr) {
-    return generateInsertsStream(commitTime, n, false, 
schemaStr).collect(Collectors.toList());
+    return generateInsertsStream(commitTime, n, false, schemaStr, 
System.currentTimeMillis()).collect(Collectors.toList());
+  }
+
+  public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime, 
Integer n, String schemaStr, long timestamp) {
+    return generateInsertsStream(commitTime, n, false, schemaStr, 
timestamp).collect(Collectors.toList());
   }
 
   /**
@@ -796,8 +800,12 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     return generateInserts(instantTime, n, false);
   }
 
+  public List<HoodieRecord> generateInserts(String instantTime, Integer n, 
long timestamp) {
+    return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, 
timestamp).collect(Collectors.toList());
+  }
+
   public List<HoodieRecord> generateInsertsNestedExample(String instantTime, 
Integer n) {
-    return generateInsertsStream(instantTime, n, false, 
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+    return generateInsertsStream(instantTime, n, false, 
TRIP_NESTED_EXAMPLE_SCHEMA, 
System.currentTimeMillis()).collect(Collectors.toList());
   }
 
   /**
@@ -810,32 +818,35 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
    * @return  List of {@link HoodieRecord}s
    */
   public List<HoodieRecord> generateInserts(String instantTime, Integer n, 
boolean isFlattened) {
-    return generateInsertsStream(instantTime, n, isFlattened, isFlattened ? 
TRIP_FLATTENED_SCHEMA : TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
+    return generateInsertsStream(instantTime, n, isFlattened, isFlattened ? 
TRIP_FLATTENED_SCHEMA : TRIP_EXAMPLE_SCHEMA, 
System.currentTimeMillis()).collect(Collectors.toList());
   }
 
   /**
    * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
    */
-  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, boolean isFlattened, String schemaStr) {
-    return generateInsertsStream(commitTime, n, isFlattened, schemaStr, false);
+  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, boolean isFlattened, String schemaStr, long timestamp) {
+    return generateInsertsStream(commitTime, n, isFlattened, schemaStr, false, 
timestamp);
   }
 
   public List<HoodieRecord> generateInsertsContainsAllPartitions(String 
instantTime, Integer n) {
     if (n < partitionPaths.length) {
       throw new HoodieIOException("n must greater then partitionPaths length");
     }
-    return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
true).collect(Collectors.toList());
+    long timestamp = System.currentTimeMillis();
+    return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
true, timestamp).collect(Collectors.toList());
   }
 
   public List<HoodieRecord> generateInsertsForPartitionPerSchema(String 
instantTime, Integer n, String partition, String schemaStr) {
-    return generateInsertsStream(instantTime,  n, false, schemaStr, false, () 
-> partition, () -> 
genPseudoRandomUUID(rand).toString()).collect(Collectors.toList());
+    long timestamp = System.currentTimeMillis();
+    return generateInsertsStream(instantTime,  n, false, schemaStr, false, () 
-> partition, () -> genPseudoRandomUUID(rand).toString(), 
timestamp).collect(Collectors.toList());
   }
 
   public List<HoodieRecord> generateInsertsForPartition(String instantTime, 
Integer n, String partition) {
-    return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
false, () -> partition, () -> 
genPseudoRandomUUID(rand).toString()).collect(Collectors.toList());
+    long timestamp = System.currentTimeMillis();
+    return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
false, () -> partition, () -> genPseudoRandomUUID(rand).toString(), 
timestamp).collect(Collectors.toList());
   }
 
-  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
+  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions, long 
timestamp) {
     AtomicInteger partitionIndex = new AtomicInteger(0);
     return generateInsertsStream(commitTime, n, isFlattened, schemaStr, 
containsAllPartitions,
         () -> {
@@ -844,14 +855,15 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
           partitionIndex.set((partitionIndex.get() + 1) % 
partitionPaths.length);
           return partitionToUse;
         },
-        () -> genPseudoRandomUUID(rand).toString());
+        () -> genPseudoRandomUUID(rand).toString(),
+        timestamp);
   }
 
   /**
    * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
    */
   public Stream<HoodieRecord> generateInsertsStream(String instantTime, 
Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions,
-                                                    Supplier<String> 
partitionPathSupplier, Supplier<String> recordKeySupplier) {
+                                                    Supplier<String> 
partitionPathSupplier, Supplier<String> recordKeySupplier, long timestamp) {
     int currSize = getNumExistingKeys(schemaStr);
     return IntStream.range(0, n).boxed().map(i -> {
       String partitionPath = partitionPathSupplier.get();
@@ -865,7 +877,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
       populateKeysBySchema(schemaStr, currSize + i, kp);
       incrementNumExistingKeysBySchema(schemaStr);
       try {
-        return new HoodieAvroIndexedRecord(key, 
generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened),
+        return new HoodieAvroIndexedRecord(key, 
generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened, 
timestamp),
             null, 
Option.of(Collections.singletonMap("InputRecordCount_1506582000", "2")));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
@@ -905,8 +917,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
 
   public List<HoodieRecord> generateUpdatesWithHoodieAvroPayload(String 
instantTime, List<HoodieRecord> baseRecords) {
     List<HoodieRecord> updates = new ArrayList<>();
+    long timestamp = System.currentTimeMillis();
     for (HoodieRecord baseRecord : baseRecords) {
-      HoodieRecord record = new HoodieAvroIndexedRecord(baseRecord.getKey(), 
generateAvroPayload(baseRecord.getKey(), instantTime));
+      HoodieRecord record = new HoodieAvroIndexedRecord(baseRecord.getKey(), 
generateAvroPayload(baseRecord.getKey(), instantTime, timestamp));
       updates.add(record);
     }
     return updates;
@@ -1031,10 +1044,18 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     return generateUniqueUpdatesStream(instantTime, n, 
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
   }
 
+  public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer 
n, long timestamp) {
+    return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA, 
timestamp).collect(Collectors.toList());
+  }
+
   public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer 
n, String schemaStr) {
     return generateUniqueUpdatesStream(instantTime, n, 
schemaStr).collect(Collectors.toList());
   }
 
+  public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer 
n, String schemaStr, long timestamp) {
+    return generateUniqueUpdatesStream(instantTime, n, schemaStr, 
timestamp).collect(Collectors.toList());
+  }
+
   public List<HoodieRecord> generateUniqueUpdatesNestedExample(String 
instantTime, Integer n) {
     return generateUniqueUpdatesStream(instantTime, n, 
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
   }
@@ -1061,6 +1082,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
    * @return stream of hoodie record updates
    */
   public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, 
Integer n, String schemaStr) {
+    long timestamp = System.currentTimeMillis();
+    return generateUniqueUpdatesStream(instantTime, n, schemaStr, timestamp);
+  }
+
+  public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, 
Integer n, String schemaStr, long timestamp) {
     final Set<KeyPartition> used = new HashSet<>();
     int numExistingKeys = numKeysBySchema.getOrDefault(schemaStr, 0);
     Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(schemaStr);
@@ -1079,7 +1105,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
       logger.debug("key getting updated: {}", kp.key.getRecordKey());
       used.add(kp);
       try {
-        return new HoodieAvroIndexedRecord(kp.key, 
generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false));
+        return new HoodieAvroIndexedRecord(kp.key, 
generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false, 
timestamp));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
@@ -1122,9 +1148,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
    *
    * @param instantTime Commit Timestamp
    * @param n          Number of unique records
+   * @param updatePartition whether to update the partition path while 
generating delete record
+   * @param timestamp timestamp to set in the record for the ordering value
    * @return stream of hoodie records for delete
    */
-  public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String 
instantTime, Integer n, boolean updatePartition) {
+  private Stream<HoodieRecord> generateUniqueDeleteRecordStream(String 
instantTime, Integer n, boolean updatePartition, long timestamp) {
     final Set<KeyPartition> used = new HashSet<>();
     Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
     Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
@@ -1151,7 +1179,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
         key = new HoodieKey(key.getRecordKey(), updatedPartitionPath);
       }
       try {
-        result.add(new HoodieAvroIndexedRecord(key, 
generateRandomDeleteValue(key, instantTime)));
+        result.add(new HoodieAvroIndexedRecord(key, 
generateRandomDeleteValue(key, instantTime, timestamp)));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
@@ -1168,11 +1196,19 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
    * @return List of hoodie records for delete
    */
   public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime, 
Integer n) {
-    return generateUniqueDeleteRecordStream(instantTime, n, 
false).collect(Collectors.toList());
+    return generateUniqueDeleteRecordStream(instantTime, n, false, 
System.currentTimeMillis()).collect(Collectors.toList());
   }
 
   public List<HoodieRecord> 
generateUniqueDeleteRecordsWithUpdatedPartition(String instantTime, Integer n) {
-    return generateUniqueDeleteRecordStream(instantTime, n, 
true).collect(Collectors.toList());
+    return generateUniqueDeleteRecordStream(instantTime, n, true, 
System.currentTimeMillis()).collect(Collectors.toList());
+  }
+
+  public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime, 
Integer n, long timestamp) {
+    return generateUniqueDeleteRecordStream(instantTime, n, false, 
timestamp).collect(Collectors.toList());
+  }
+
+  public List<HoodieRecord> 
generateUniqueDeleteRecordsWithUpdatedPartition(String instantTime, Integer n, 
long timestamp) {
+    return generateUniqueDeleteRecordStream(instantTime, n, true, 
timestamp).collect(Collectors.toList());
   }
 
   public boolean deleteExistingKeyIfPresent(HoodieKey key) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index fa2cce1cd347..c78292f4b45e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -64,7 +64,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Test cases for {@link StreamReadMonitoringFunction}.
  */
 public class TestStreamReadMonitoringFunction {
-  private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+  private static final long WAIT_TIME_MILLIS = 10 * 1000L;
 
   private Configuration conf;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index a3c70eab57b2..681442274893 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -281,9 +281,10 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
       commitToTable(initialRecords, UPSERT.value(), true, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
+      String[] orderingFields = recordMergeMode == 
RecordMergeMode.COMMIT_TIME_ORDERING ? new String[0] : new 
String[]{ORDERING_FIELD_NAME};
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
-          initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+          initialRecords, initialRecords, orderingFields);
 
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
@@ -291,7 +292,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
       commitToTable(updates, UPSERT.value(), false, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
-          allRecords, CollectionUtils.combine(initialRecords, updates), new 
String[] {ORDERING_FIELD_NAME});
+          allRecords, CollectionUtils.combine(initialRecords, updates), 
orderingFields);
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 05a31fad7c3a..d5a6cf995f1c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -72,20 +72,21 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
                                                      validate: Boolean = true,
                                                      numUpdates: Int = 1,
                                                      onlyUpdates: Boolean = 
false,
-                                                     schemaStr: String = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA): DataFrame = {
+                                                     schemaStr: String = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
+                                                     timestamp: Long = 
System.currentTimeMillis()): DataFrame = {
     var latestBatch: mutable.Buffer[String] = null
     if (operation == UPSERT_OPERATION_OPT_VAL) {
       val instantTime = getInstantTime()
-      val records = 
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, numUpdates, 
schemaStr))
+      val records = 
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, numUpdates, 
schemaStr, timestamp))
       if (!onlyUpdates) {
-        
records.addAll(recordsToStrings(dataGen.generateInsertsAsPerSchema(instantTime, 
1, schemaStr)))
+        
records.addAll(recordsToStrings(dataGen.generateInsertsAsPerSchema(instantTime, 
1, schemaStr, timestamp)))
       }
       latestBatch = records.asScala
     } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
       latestBatch = 
recordsToStrings(dataGen.generateInsertsForPartitionPerSchema(
         getInstantTime(), 5, dataGen.getPartitionPaths.last, 
schemaStr)).asScala
     } else {
-      latestBatch = 
recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), 5, 
schemaStr)).asScala
+      latestBatch = 
recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), 5, 
schemaStr, timestamp)).asScala
     }
     val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
     latestBatchDf.cache()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 7e84fc0bc978..f49284704fbb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -185,6 +185,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // Insert Operation
     var records = recordsToStrings(dataGen.generateInserts("003", 
100)).asScala.toList
     var inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
 
     val commonOptsWithMultipleOrderingFields = writeOpts ++ Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
@@ -203,6 +204,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("002", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -212,6 +214,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("004", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -221,7 +224,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("001", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
-    inputDF = inputDF.withColumn("timestamp", lit(1))
+    inputDF = inputDF.withColumn("timestamp", lit(20))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7a9263ce58de..4fd01a58ecc0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1776,6 +1776,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     // Insert Operation
     var records = recordsToStrings(dataGen.generateInserts("003", 
100)).asScala.toList
     var inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
 
     val commonOptsWithMultipleOrderingFields = writeOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
@@ -1795,6 +1796,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("002", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -1804,6 +1806,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("004", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF = inputDF.withColumn("timestamp", lit(10))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -1813,7 +1816,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
 
     records = recordsToStrings(dataGen.generateUniqueUpdates("001", 
10)).asScala.toList
     inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
-    inputDF = inputDF.withColumn("timestamp", lit(1))
+    inputDF = inputDF.withColumn("timestamp", lit(20))
     inputDF.write.format("hudi")
       .options(commonOptsWithMultipleOrderingFields)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
index 776e27381703..7dfcb7c56c99 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
@@ -26,9 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.data.HoodieListData
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecordGlobalLocation, 
HoodieTableType}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordGlobalLocation, 
HoodieTableType}
 import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
InProcessTimeGenerator}
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
@@ -43,8 +43,9 @@ import org.apache.spark.sql.functions.lit
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
 
+import java.util
 import java.util.stream.Collectors
 
 import scala.collection.JavaConverters
@@ -76,7 +77,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
       HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
       HoodieIndexConfig.INDEX_TYPE.key() -> PARTITIONED_RECORD_INDEX.name())
     holder.options = options
-    insertDf.write.format("org.apache.hudi")
+    insertDf.write.format("hudi")
       .options(options)
       .mode(SaveMode.Overwrite)
       .save(basePath)
@@ -99,11 +100,13 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
 
     val newDeletes =  dataGen.generateUpdates("004", 1)
     val updates =  dataGen.generateUniqueUpdates("002", 3)
+    val lowerOrderingValue = 1L
+    updates.addAll(dataGen.generateUniqueDeleteRecords("002", 2, 
lowerOrderingValue))
     val nextBatch = recordsToStrings(updates).asScala.toSeq
     val nextBatchDf = 
spark.read.json(spark.sparkContext.parallelize(nextBatch, 1))
     val updateDf = nextBatchDf.withColumn("data_partition_path", 
lit("partition1"))
 
-    updateDf.write.format("org.apache.hudi")
+    updateDf.write.format("hudi")
       .options(options)
       .option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -121,7 +124,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val newInsertBatch = recordsToStrings(newInserts).asScala.toSeq
     val newInsertBatchDf = 
spark.read.json(spark.sparkContext.parallelize(newInsertBatch, 1))
     val newInsertDf = newInsertBatchDf.withColumn("data_partition_path", 
lit("partition2")).union(newInsertBatchDf.withColumn("data_partition_path", 
lit("partition3")))
-    newInsertDf.write.format("org.apache.hudi")
+    newInsertDf.write.format("hudi")
       .options(options)
       .option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -149,7 +152,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val newDeletesBatch = recordsToStrings(newDeletes).asScala.toSeq
     val newDeletesBatchDf = 
spark.read.json(spark.sparkContext.parallelize(newDeletesBatch, 1))
     val newDeletesDf = newDeletesBatchDf.withColumn("data_partition_path", 
lit("partition1"))
-    newDeletesDf.write.format("org.apache.hudi")
+    newDeletesDf.write.format("hudi")
       .options(options)
       .option(DataSourceWriteOptions.OPERATION.key(), DELETE_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -181,7 +184,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val bulkInsertPartitionedDf = 
bulkInsertDf.withColumn("data_partition_path", lit("partition0"))
 
     // Use bulk_insert operation explicitly
-    bulkInsertPartitionedDf.write.format("org.apache.hudi")
+    bulkInsertPartitionedDf.write.format("hudi")
       .options(options)
       .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -348,7 +351,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
       HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> 
streamingWriteEnabled.toString,
       HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
       HoodieIndexConfig.INDEX_TYPE.key() -> PARTITIONED_RECORD_INDEX.name())
-    insertDf.write.format("org.apache.hudi")
+    insertDf.write.format("hudi")
       .options(options)
       .mode(SaveMode.Overwrite)
       .save(basePath)
@@ -359,7 +362,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val nextBatch = recordsToStrings(updates).asScala.toSeq
     val nextBatchDf = 
spark.read.json(spark.sparkContext.parallelize(nextBatch, 1))
     val updateDf = nextBatchDf.withColumn("data_partition_path", 
lit("partition1"))
-    updateDf.write.format("org.apache.hudi")
+    updateDf.write.format("hudi")
       .options(options)
       .option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -374,7 +377,7 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
       val batchToFail = recordsToStrings(updatesToFail).asScala.toSeq
       val batchToFailDf = 
spark.read.json(spark.sparkContext.parallelize(batchToFail, 1))
       val failDf = batchToFailDf.withColumn("data_partition_path", 
lit("partition1")).union(batchToFailDf.withColumn("data_partition_path", 
lit("partition3")))
-      failDf.write.format("org.apache.hudi")
+      failDf.write.format("hudi")
         .options(options)
         .option(DataSourceWriteOptions.OPERATION.key(), 
UPSERT_OPERATION_OPT_VAL)
         .mode(SaveMode.Append)
@@ -444,6 +447,37 @@ class TestPartitionedRecordLevelIndex extends 
RecordLevelIndexTestBase {
     
metadata.readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys), 
dataTablePartition)
       .collectAsList().asScala.map(p => p.getKey -> p.getValue).toMap
   }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): 
Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"false")
+    val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    insertDf.cache()
+
+    val instantTime = InProcessTimeGenerator.createNewInstantTime()
+    // Issue two deletes, where one has an older ordering value that should be 
ignored
+    val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+    val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+    val lowerOrderingValue = 1L
+    inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1, 
lowerOrderingValue))
+    val deleteBatch = recordsToStrings(inputRecords).asScala
+    val deleteDf = 
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+    deleteDf.cache()
+    val recordKeyToDelete = 
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+    deleteDf.write.format("hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val prevDf = mergedDfList.last
+    mergedDfList = mergedDfList :+ prevDf.filter(row => 
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete)
+    validateDataAndRecordIndices(hudiOpts, 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
 1)))
+    deleteDf.unpersist()
+  }
 }
 
 object TestPartitionedRecordLevelIndex {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 95a06b3f9484..93e3fcda891b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
InProcessTimeGenerator}
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
@@ -47,7 +47,6 @@ import org.junit.jupiter.params.provider.Arguments.arguments
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.Executors
-import java.util.stream.Collectors
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -85,7 +84,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
     var operation = INSERT_OPERATION_OPT_VAL
     val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch, 1))
     latestBatchDf.cache()
-    latestBatchDf.write.format("org.apache.hudi")
+    latestBatchDf.write.format("hudi")
       .options(hudiOpts)
       .mode(SaveMode.Overwrite)
       .save(basePath)
@@ -103,7 +102,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
     val latestBatchDf2_3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1))
     val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2)
     latestBatchDf2Final.cache()
-    latestBatchDf2Final.write.format("org.apache.hudi")
+    latestBatchDf2Final.write.format("hudi")
       .options(hudiOpts)
       .mode(SaveMode.Append)
       .save(basePath)
@@ -123,14 +122,16 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val latestBatch3 = 
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
     val latestBatchDf3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
     latestBatchDf3.cache()
-    latestBatchDf.write.format("org.apache.hudi")
+    latestBatchDf3.write.format("hudi")
       .options(hudiOpts2)
       .mode(SaveMode.Append)
       .save(basePath)
-    val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
+    val deletedDf3 = calculateMergedDf(latestBatchDf3, operation, true)
     deletedDf3.cache()
     metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     validateDataAndRecordIndices(hudiOpts, deletedDf3)
+    deletedDf2.unpersist()
+    deletedDf3.unpersist()
   }
 
   private def getNewInstantTime(): String = {
@@ -248,7 +249,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite)
     val deleteDf = insertDf.limit(1)
-    deleteDf.write.format("org.apache.hudi")
+    deleteDf.write.format("hudi")
       .options(hudiOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -291,15 +292,54 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     insertDf.cache()
 
     val instantTime = getNewInstantTime
-    // Issue two deletes, one with the original partition and one with an 
updated partition
-    val recordsToDelete = dataGen.generateUniqueDeleteRecords(instantTime, 1)
-    
recordsToDelete.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1))
-    val deleteBatch = recordsToStrings(recordsToDelete).asScala
+    // Issue four deletes, one with the original partition, one with an 
updated partition,
+    // and two with an older ordering value that should be ignored
+    val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+    
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1))
+    val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+    val lowerOrderingValue = 1L
+    inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1, 
lowerOrderingValue))
+    
inputRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1, lowerOrderingValue))
+    val deleteBatch = recordsToStrings(inputRecords).asScala
     val deleteDf = 
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
     deleteDf.cache()
     val recordKeyToDelete1 = 
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
     val recordKeyToDelete2 = 
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
-    deleteDf.write.format("org.apache.hudi")
+    deleteDf.write.format("hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val prevDf = mergedDfList.last
+    mergedDfList = mergedDfList :+ prevDf.filter(row => 
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete1 &&
+      row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete2)
+    validateDataAndRecordIndices(hudiOpts, 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
 1)))
+    deleteDf.unpersist()
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIForDeletesWithCommitTimeOrdering(tableType: HoodieTableType): 
Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"true") +
+      (HoodieTableConfig.ORDERING_FIELDS.key -> "")
+    val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    insertDf.cache()
+
+    val instantTime = getNewInstantTime
+    val lowerOrderingValue = 1L
+    // Issue two deletes, one with the original partition, one with an updated 
partition,
+    // Both have an older ordering value but that is ignored since the table 
uses commit time ordering
+    val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1, 
lowerOrderingValue)
+    
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1, lowerOrderingValue))
+    val deleteBatch = recordsToStrings(deletedRecords).asScala
+    val deleteDf = 
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+    deleteDf.cache()
+    val recordKeyToDelete1 = 
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+    val recordKeyToDelete2 = 
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
+    deleteDf.write.format("hudi")
       .options(hudiOpts)
       .mode(SaveMode.Append)
       .save(basePath)
@@ -658,13 +698,15 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
 
     val executor = Executors.newFixedThreadPool(2)
     implicit val executorContext: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
+    val timestamp = System.currentTimeMillis()
     val function = new Function0[Boolean] {
       override def apply(): Boolean = {
         try {
           doWriteAndValidateDataAndRecordIndex(hudiOpts,
             operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
             saveMode = SaveMode.Append,
-            validate = false)
+            validate = false,
+            timestamp = timestamp)
           true
         } catch {
           case _: HoodieWriteConflictException => false
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index a5a72e395aa7..7edd11804c91 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -358,11 +358,11 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator(makeDatesAmbiguous);
     if (useCustomSchema) {
       Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
+          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr, 0L),
           schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
     } else {
       Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-          dataGenerator.generateInserts("000", numRecords)), new Path(path));
+          dataGenerator.generateInserts("000", numRecords, 0L)), new 
Path(path));
     }
     return dataGenerator;
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 0600d28a5dae..fc75e76275af 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -159,8 +160,8 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     testUtils.createTopic(topicName2, 2);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
-    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA, 0L)));
 
     HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
JsonKafkaSource.class.getName(), false, false, null);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -185,8 +186,10 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     assertRecordCount(10, targetBasePath2, sqlContext);
 
     //insert updates for already existing records in kafka topics
-    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
-    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateUniqueUpdatesStream("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)
+        .collect(Collectors.toList())));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateUniqueUpdatesStream("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA, 0L)
+        .collect(Collectors.toList())));
 
     streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
     streamer.getTableExecutionContexts().get(1).setProperties(properties);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index 777c7f237854..cbe823dd07f0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -137,15 +137,15 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
     if (!reachedMax && numUpdates >= 50) {
       LOG.info("After adjustments => NumInserts={}, NumUpdates={}, 
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50), 
maxUniqueKeys);
       // if we generate update followed by deletes -> some keys in update 
batch might be picked up for deletes. Hence generating delete batch followed by 
updates
-      deleteStream = 
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50, 
false).map(AbstractBaseTestSource::toGenericRecord);
-      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+      deleteStream = dataGenerator.generateUniqueDeleteRecords(instantTime, 
50, 0L).stream().map(AbstractBaseTestSource::toGenericRecord);
+      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
           .map(AbstractBaseTestSource::toGenericRecord);
     } else {
       LOG.info("After adjustments => NumInserts={}, NumUpdates={}, 
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
-      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
           .map(AbstractBaseTestSource::toGenericRecord);
     }
-    Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(instantTime, numInserts, false, 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+    Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(instantTime, numInserts, false, 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
         .map(AbstractBaseTestSource::toGenericRecord);
     if 
(Boolean.valueOf(props.getOrDefault("hoodie.test.source.generate.inserts", 
"false").toString())) {
       return insertStream;

Reply via email to