This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0e05e3a1c7618b3f8ede81331c16c328b1abe346
Author: Danny Chan <[email protected]>
AuthorDate: Fri Apr 11 07:32:53 2025 +0800

    [HUDI-9267] Fix the file group reader log file read sequence (#13115)
    
    * [HUDI-9267] Fix the file group reader log file read sequence
    
    Fix the file group reader log file sequence to be in asending order, so 
that to keep the "processing_time"
    merging semantics for streaming scenarios: always choose the latest 
incoming if the ordering val are equals.
    
    This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging 
modes after the fix.
    
    Also fix some other issues:
    
    * the unnecessary copy of rows for position based merging;
    * the event time merging sequence for CUSTOM merger.
    * the HoodieEmptyRecord default ordering value
    * the fallback strategy read for position based merging
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
    (cherry picked from commit 0eac5551b91e8ac0fa8cdf7268677d277ceb4273)
---
 .../hudi/common/model/HoodieEmptyRecord.java       |   5 +-
 .../table/log/BaseHoodieLogRecordReader.java       |   4 +-
 .../common/table/read/FileGroupRecordBuffer.java   | 106 ++++++++++++++-------
 .../read/PositionBasedFileGroupRecordBuffer.java   |  22 +++--
 .../hudi/common/table/read/TestCustomMerger.java   |   3 +-
 .../TestPositionBasedFileGroupRecordBuffer.java    |   6 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   2 +-
 7 files changed, 102 insertions(+), 46 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index bbe55171682..8183bde74d2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -39,7 +39,10 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) {
     super(key, null);
     this.type = type;
-    this.orderingVal = null;
+    // IMPORTANT:
+    // This should be kept in line with EmptyHoodieRecordPayload
+    // default natural order
+    this.orderingVal = 0;
   }
 
   public HoodieEmptyRecord(HoodieKey key, HoodieOperation operation, 
Comparable<?> orderingVal, HoodieRecordType type) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 71d4cd2842c..fe92f895843 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -223,13 +223,13 @@ public abstract class BaseHoodieLogRecordReader<T> {
     totalCorruptBlocks = new AtomicLong(0);
     totalLogBlocks = new AtomicLong(0);
     totalLogRecords = new AtomicLong(0);
-    HoodieLogFormatReverseReader logFormatReaderWrapper = null;
+    HoodieLogFormatReader logFormatReaderWrapper = null;
     HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
     HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
     try {
       // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReverseReader(storage,
+      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
StoragePath(logFile))).collect(Collectors.toList()),
           readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 15c22883716..a5e7730534d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -233,13 +233,14 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   /**
    * Merge two log data records if needed.
    *
-   * @param record
-   * @param metadata
-   * @param existingRecordMetadataPair
-   * @return
-   * @throws IOException
+   * @param newRecord                  The new incoming record
+   * @param metadata                   The metadata
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The pair of the record that needs to be updated with and its 
metadata,
+   * returns empty to skip the update.
    */
-  protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T record,
+  protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T newRecord,
                                                                                
  Map<String, Object> metadata,
                                                                                
  Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
       throws IOException {
@@ -249,14 +250,12 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         // TODO(HUDI-7843): decouple the merging logic from the merger
         //  and use the record merge mode to control how to merge partial 
updates
         // Merge and store the combined record
-        // Note that the incoming `record` is from an older commit, so it 
should be put as
-        // the `older` in the merge API
         Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().partialMerge(
-            readerContext.constructHoodieRecord(Option.of(record), metadata),
-            readerContext.getSchemaFromMetadata(metadata),
             readerContext.constructHoodieRecord(
                 existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
             
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+            readerContext.constructHoodieRecord(Option.of(newRecord), 
metadata),
+            readerContext.getSchemaFromMetadata(metadata),
             readerSchema,
             props);
         if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -266,7 +265,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
 
         // If pre-combine returns existing record, no need to update it
-        if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
+        if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().orElse(null)) {
           return Option.of(Pair.of(
               Option.ofNullable(combinedRecord.getData()),
               
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())));
@@ -275,43 +274,47 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.empty();
+            return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
           case EVENT_TIME_ORDERING:
-            Comparable existingOrderingValue = readerContext.getOrderingValue(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(),
-                readerSchema, orderingFieldName);
-            if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingValue)) {
-              return Option.empty();
-            }
-            Comparable incomingOrderingValue = readerContext.getOrderingValue(
-                Option.of(record), metadata, readerSchema, orderingFieldName);
-            if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
-              return Option.of(Pair.of(Option.of(record), metadata));
+            if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
+              return Option.of(Pair.of(Option.of(newRecord), metadata));
             }
             return Option.empty();
           case CUSTOM:
           default:
             // Merge and store the combined record
-            // Note that the incoming `record` is from an older commit, so it 
should be put as
-            // the `older` in the merge API
             if (payloadClass.isPresent()) {
+              if (existingRecordMetadataPair.getLeft().isEmpty()
+                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
+                return Option.of(Pair.of(Option.of(newRecord), metadata));
+              }
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
-                  getMergedRecord(Option.of(record), metadata, 
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight());
+                  getMergedRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.of(newRecord), metadata);
               if (combinedRecordAndSchemaOpt.isPresent()) {
                 T combinedRecordData = 
readerContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
                 // If pre-combine does not return existing record, update it
-                if (combinedRecordData != 
existingRecordMetadataPair.getLeft().get()) {
+                if (combinedRecordData != 
existingRecordMetadataPair.getLeft().orElse(null)) {
                   return 
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
                 }
               }
               return Option.empty();
             } else {
+              if (existingRecordMetadataPair.getLeft().isEmpty()
+                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
+                return Option.of(Pair.of(Option.of(newRecord), metadata));
+              }
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().merge(
-                  readerContext.constructHoodieRecord(Option.of(record), 
metadata),
-                  readerContext.getSchemaFromMetadata(metadata),
                   readerContext.constructHoodieRecord(
                       existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
                   
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+                  readerContext.constructHoodieRecord(Option.of(newRecord), 
metadata),
+                  readerContext.getSchemaFromMetadata(metadata),
                   props);
 
               if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -322,7 +325,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
               HoodieRecord<T> combinedRecord = 
combinedRecordAndSchema.getLeft();
 
               // If pre-combine returns existing record, no need to update it
-              if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
+              if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().orElse(null)) {
                 return 
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
               }
               return Option.empty();
@@ -334,16 +337,17 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be put into records(Map).
-      return Option.of(Pair.of(Option.ofNullable(record), metadata));
+      return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
     }
   }
 
   /**
    * Merge a delete record with another record (data, or delete).
    *
-   * @param deleteRecord
-   * @param existingRecordMetadataPair
-   * @return
+   * @param deleteRecord               The delete record
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The option of new delete record that needs to be updated with.
    */
   protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
                                                             Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
@@ -351,7 +355,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     if (existingRecordMetadataPair != null) {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return Option.empty();
+          return Option.of(deleteRecord);
         case EVENT_TIME_ORDERING:
         case CUSTOM:
         default:
@@ -473,6 +477,17 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         case CUSTOM:
         default:
           if (payloadClass.isPresent()) {
+            if (older.isEmpty() || newer.isEmpty()) {
+              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
+                return newer;
+              } else {
+                return older;
+              }
+            }
+
             Option<Pair<HoodieRecord, Schema>> mergedRecord =
                 getMergedRecord(older, olderInfoMap, newer, newerInfoMap);
             if (mergedRecord.isPresent()
@@ -487,6 +502,16 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
             }
             return Option.empty();
           } else {
+            if (older.isEmpty() || newer.isEmpty()) {
+              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
+                return newer;
+              } else {
+                return older;
+              }
+            }
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
                 readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
                 readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap), props);
@@ -504,6 +529,19 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     }
   }
 
+  /**
+   * Decides whether to keep the incoming record with ordering value 
comparison.
+   */
+  private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object> 
oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) {
+    Comparable newOrderingVal = readerContext.getOrderingValue(newVal, 
newMetadata, readerSchema, orderingFieldName);
+    if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) {
+      // handle records coming from DELETE statements(the orderingVal is 
constant 0)
+      return true;
+    }
+    Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, 
oldMetadata, readerSchema, orderingFieldName);
+    return newOrderingVal.compareTo(oldOrderingVal) >= 0;
+  }
+
   private Option<Pair<HoodieRecord, Schema>> getMergedRecord(Option<T> older, 
Map<String, Object> olderInfoMap, Option<T> newer, Map<String, Object> 
newerInfoMap) throws IOException {
     ValidationUtils.checkArgument(!Objects.equals(payloadClass, 
OverwriteWithLatestAvroPayload.class.getCanonicalName())
         && !Objects.equals(payloadClass, 
DefaultHoodieRecordPayload.class.getCanonicalName()));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index bd450b77d31..6b2f4d2c656 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -54,7 +54,6 @@ import java.util.Set;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
-import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
 
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
@@ -184,10 +183,22 @@ public class PositionBasedFileGroupRecordBuffer<T> 
extends KeyBasedFileGroupReco
 
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
+        int commitTimeBasedRecordIndex = 0;
+        DeleteRecord[] deleteRecords = deleteBlock.getRecordsToDelete();
         for (Long recordPosition : recordPositions) {
-          records.putIfAbsent(recordPosition,
+          // IMPORTANT:
+          // use #put for log files with regular order(see 
HoodieLogFile.LOG_FILE_COMPARATOR);
+          // use #putIfAbsent for log files with reverse order(see 
HoodieLogFile.LOG_FILE_COMPARATOR_REVERSED),
+          // the delete block would be parsed ahead of a data block if they 
are in different log files.
+
+          // set up the record key for key-based fallback handling, this is 
needed
+          // because under hybrid strategy in #doHasNextFallbackBaseRecord, if 
the record keys are not set up,
+          // this delete-vector could be kept in the records cache(see the 
check in #fallbackToKeyBasedBuffer),
+          // and these keys would be deleted no matter whether there are 
following-up inserts/updates.
+          DeleteRecord deleteRecord = 
deleteRecords[commitTimeBasedRecordIndex++];
+          records.put(recordPosition,
               Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
-                  null, "", DEFAULT_ORDERING_VALUE)));
+                  deleteRecord.getRecordKey(), "", 
deleteRecord.getOrderingValue())));
         }
         return;
       case EVENT_TIME_ORDERING:
@@ -246,12 +257,11 @@ public class PositionBasedFileGroupRecordBuffer<T> 
extends KeyBasedFileGroupReco
     Map<String, Object> metadata = readerContext.generateMetadataForRecord(
         baseRecord, readerSchema);
 
-    Option<T> resultRecord = Option.empty();
+    final Option<T> resultRecord;
     if (logRecordInfo != null) {
       resultRecord = merge(
           Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight());
       if (resultRecord.isPresent()) {
-        nextRecord = readerContext.seal(resultRecord.get());
         readStats.incrementNumUpdates();
       } else {
         readStats.incrementNumDeletes();
@@ -275,7 +285,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
           ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
       Pair<Option<T>, Map<String, Object>> logRecordInfo  = 
records.remove(nextRecordPosition++);
       if (logRecordInfo != null) {
-        //we have a delete that was not able to be converted. Since it is the 
newest version, the record is deleted
+        //we have a delete that was not to be able to be converted. Since it 
is the newest version, the record is deleted
         //remove a key based record if it exists
         records.remove(readerContext.getRecordKey(baseRecord, readerSchema));
         return false;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 31d0700935b..6d7f5a468d3 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -168,8 +168,9 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
   public void testWithThreeLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
     shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions, useRecordPositions);
     ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, 
useRecordPositions);
+    // The records with keys 6 and 8 are deletes with lower ordering val
     List<String> leftKeysExpected =
-        Arrays.asList("1", "3", "7", "9", "10");
+        Arrays.asList("1", "3", "6", "7", "8", "9", "10");
     List<String> leftKeysActual = new ArrayList<>();
     while (iterator.hasNext()) {
       leftKeysActual.add(iterator.next()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index c927893a115..64e5bf1500c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -62,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
@@ -203,7 +204,10 @@ public class TestPositionBasedFileGroupRecordBuffer 
extends TestHoodieFileGroupR
     if (sameBaseInstantTime) {
       // If the log block's base instant time of record positions match the 
base file
       // to merge, the log records are stored based on the position
-      
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+      
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY),
+          "the record key is set up for fallback handling");
+      
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_ORDERING_FIELD),
+          "the ordering value is set up for fallback handling");
     } else {
       // If the log block's base instant time of record positions does not 
match the
       // base file to merge, the log records are stored based on the record key
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 4120956b4ce..b9f71c687d2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -269,7 +269,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     val columnsToCompare = Set("ts", "key", "rider", "driver", "fare", "op")
     val df = spark.read.options(readOpts).format("hudi").load(getBasePath)
     val finalDf = df.select("ts", "key", "rider", "driver", "fare", 
"op").sort("key")
-    val expected = if (mergeMode == 
RecordMergeMode.EVENT_TIME_ORDERING.name()) {
+    val expected = if (mergeMode != 
RecordMergeMode.COMMIT_TIME_ORDERING.name()) {
       expectedEventTimeBased
     } else {
       expectedCommitTimeBased

Reply via email to